1 // Copyright (C) 2006-2021 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
30 #include <algorithm> // std::replace_if
31 #include <functional> // std::bind
34 #include "YacsTrace.hxx"
40 using namespace YACS::ENGINE;
43 /*! \class YACS::ENGINE::ForEachLoop
44 * \brief Loop node for parametric calculation
49 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
51 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
53 const char ForEachLoopGen::NAME_OF_SPLITTERNODE[]="splitter";
55 const int ForEachLoopGen::NOT_RUNNING_BRANCH_ID=-1;
57 const char ForEachLoopGen::INTERCEPTOR_STR[]="_interceptor";
59 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
60 DataPort(name,node,type),Port(node),
65 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
66 Port(other,newHelder),
71 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
73 set<InPort *> ports=_repr->edSetInPort();
74 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
75 (*iter)->getAllRepresentants(repr);
78 InputPort *InterceptorInputPort::clone(Node *newHelder) const
80 return new InterceptorInputPort(*this,newHelder);
83 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
88 bool AnySplitOutputPort::decrRef()
93 void AnySplitOutputPort::incrRef() const
98 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
99 DataPort(name,node,type),Port(node),
100 _repr(0),_intercptr(0),_cnt(1)
104 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
105 DataPort(other,newHelder),
106 Port(other,newHelder),
107 _repr(0),_intercptr(0),_cnt(1)
111 bool AnySplitOutputPort::addInPort(InPort *inPort)
113 bool ret=OutputPort::addInPort(inPort);
115 _repr->addInPort(_intercptr);
119 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
122 OutPort::getAllRepresented(represented);
124 _repr->getAllRepresented(represented);
127 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward)
129 bool ret=OutputPort::removeInPort(inPort,forward);
131 if(_setOfInputPort.empty())
132 _repr->removeInPort(_intercptr,forward);
136 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
139 _intercptr=intercptr;
142 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
144 return new AnySplitOutputPort(*this,newHelder);
147 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
152 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
156 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
158 return new SeqAnyInputPort(*this,newHelder);
161 unsigned SeqAnyInputPort::getNumberOfElements() const
163 const SequenceAny * valCsted=(const SequenceAny *) _value;
164 if (valCsted) return valCsted->size();
168 Any *SeqAnyInputPort::getValueAtRank(int i) const
170 const SequenceAny * valCsted=(const SequenceAny *) _value;
171 AnyPtr ret=(*valCsted)[i];
176 std::string SeqAnyInputPort::dump()
178 stringstream xmldump;
179 int nbElem = getNumberOfElements();
180 xmldump << "<value><array><data>" << endl;
181 for (int i = 0; i < nbElem; i++)
183 Any *val = getValueAtRank(i);
184 switch (((YACS::ENGINE::TypeCodeSeq *)edGetType())->contentType()->kind())
187 xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
190 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
193 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
196 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
199 xmldump << "<value><objref>" << ToBase64(val->getStringValue()) << "</objref></value>" << endl;
202 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
206 xmldump << "</data></array></value>" << endl;
207 return xmldump.str();
210 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
211 ForEachLoopGen *father):ElementaryNode(name),
212 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
213 this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
218 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoopGen *father):ElementaryNode(other,father),
219 _dataPortToDispatch(other._dataPortToDispatch,this)
223 InputPort *SplitterNode::getInputPort(const std::string& name) const
225 if(name==NAME_OF_SEQUENCE_INPUT)
226 return (InputPort *)&_dataPortToDispatch;
228 return ElementaryNode::getInputPort(name);
231 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
233 return new SplitterNode(*this,(ForEachLoopGen *)father);
236 unsigned SplitterNode::getNumberOfElements() const
238 return _dataPortToDispatch.getNumberOfElements();
241 void SplitterNode::execute()
243 //Nothing : should never been called elsewhere big problem...
246 void SplitterNode::init(bool start)
248 ElementaryNode::init(start);
249 _dataPortToDispatch.exInit(start);
252 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
254 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
255 ForEachLoopGen *fatherTyped=(ForEachLoopGen *)_father;
256 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
257 valueToDispatch->decrRef();
260 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoopGen *loop, bool normalFinish):ElementaryNode(NAME),
262 _normalFinish(normalFinish)
264 _state=YACS::TOACTIVATE;
265 _father=_loop->getFather();
268 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
273 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
275 return new FakeNodeForForEachLoop(*this);
278 void FakeNodeForForEachLoop::exForwardFailed()
280 _loop->exForwardFailed();
283 void FakeNodeForForEachLoop::exForwardFinished()
285 _loop->exForwardFinished();
288 void FakeNodeForForEachLoop::execute()
291 throw Exception("");//only to trigger ABORT on Executor
293 _loop->pushAllSequenceValues();
296 void FakeNodeForForEachLoop::aborted()
298 _loop->setState(YACS::ERROR);
301 void FakeNodeForForEachLoop::finished()
303 _loop->setState(YACS::DONE);
306 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
308 std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
310 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
311 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
313 const SequenceAny *elt(*it);
315 if(sz!=(std::size_t)elt->size())
316 throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
318 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
320 SequenceAny *elt(*it);
326 ForEachLoopPassedData::ForEachLoopPassedData(const ForEachLoopPassedData& copy)
327 : _passedIds(copy._passedIds),
328 _passedOutputs(copy._passedOutputs),
329 _nameOfOutputs(copy._nameOfOutputs),
330 _flagsIds(copy._flagsIds)
334 ForEachLoopPassedData::~ForEachLoopPassedData()
336 for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
338 SequenceAny *elt(*it);
344 void ForEachLoopPassedData::init()
349 void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
352 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
353 std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
354 if(nbOfElts2<sizeExp)
355 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
356 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
359 throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
361 _flagsIds.resize(nbOfElts);
362 std::fill(_flagsIds.begin(),_flagsIds.end(),false);
363 for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
371 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
372 throw YACS::Exception(oss.str());
377 std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
378 throw YACS::Exception(oss.str());
383 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
385 std::size_t sz(_nameOfOutputs.size());
387 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
388 for(std::size_t i=0;i<sz;i++)
390 AnyInputPort *elt(ports[i]);
392 throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
393 if(_nameOfOutputs[i]!=elt->getName())
395 std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
396 throw YACS::Exception(oss.str());
402 * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
404 int ForEachLoopPassedData::toAbsId(int localId) const
407 throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
408 int ret(0),curLocId(0);
409 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
413 if(localId==curLocId)
418 throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
422 * Equivalent to toAbsId except that only ON are considered here.
424 int ForEachLoopPassedData::toAbsIdNot(int localId) const
427 throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
428 int ret(0),curLocId(0);
429 for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
431 if(*it)//<- diff is here !
433 if(localId==curLocId)
438 throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
441 int ForEachLoopPassedData::getNumberOfElementsToDo() const
443 std::size_t nbAllElts(_flagsIds.size());
444 std::size_t ret(nbAllElts-_passedIds.size());
448 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
450 std::size_t sz(execVals.size());
451 if(_passedOutputs.size()!=sz)
452 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
453 for(std::size_t i=0;i<sz;i++)
455 SequenceAny *elt(_passedOutputs[i]);
456 SequenceAny *eltDestination(execVals[i]);
458 throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
459 unsigned int szOfElt(elt->size());
460 for(unsigned int j=0;j<szOfElt;j++)
462 AnyPtr elt1((*elt)[j]);
463 int jAbs(toAbsIdNot(j));
464 eltDestination->setEltAtRank(jAbs,elt1);
469 ForEachLoopGen::ForEachLoopGen(const std::string& name, TypeCode *typeOfDataSplitted, std::unique_ptr<NbBranchesAbstract>&& branchManager):
470 DynParaLoop(name,typeOfDataSplitted,std::move(branchManager)),
471 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
472 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
476 ForEachLoopGen::ForEachLoopGen(const ForEachLoopGen& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
477 _splitterNode(other._splitterNode,this),
478 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
482 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
484 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
485 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
486 temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
487 interc->setRepr(temp);
488 _outGoingPorts.push_back(temp);
489 _intecptrsForOutGoingPorts.push_back(interc);
493 ForEachLoopGen::~ForEachLoopGen()
496 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
498 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
503 void ForEachLoopGen::init(bool start)
505 DynParaLoop::init(start);
506 _splitterNode.init(start);
515 void ForEachLoopGen::exUpdateState()
517 DEBTRACE("ForEachLoopGen::exUpdateState");
518 if(_state == YACS::DISABLED)
520 if(_state == YACS::DONE)
522 if(_inGate.exIsReady())
524 //internal graph update
526 int nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
527 int nbOfBr(_nbOfBranches->getNumberOfBranches(nbOfElts));
530 _passedData->checkCompatibilyWithNb(nbOfElts);
531 nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
533 int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
535 DEBTRACE("nbOfElts=" << nbOfElts);
536 DEBTRACE("nbOfBr=" << nbOfBr);
540 prepareSequenceValues(0);
541 delete _nodeForSpecialCases;
542 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
543 setState(YACS::ACTIVATED);
548 delete _nodeForSpecialCases;
549 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
550 setState(YACS::ACTIVATED);
553 if(nbOfBr>nbOfEltsToDo)
555 _execNodes.resize(nbOfBr);
556 _execIds.resize(nbOfBr);
557 _execOutGoingPorts.resize(nbOfBr);
558 prepareSequenceValues(nbOfElts);
560 _execInitNodes.resize(nbOfBr);
561 _initializingCounter = 0;
563 _execFinalizeNodes.resize(nbOfBr);
565 vector<Node *> origNodes;
566 origNodes.push_back(_initNode);
567 origNodes.push_back(_node);
568 origNodes.push_back(_finalizeNode);
570 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
571 //so catch them to control errors
574 for(i=0;i<nbOfBr;i++)
576 DEBTRACE( "-------------- 2" );
577 vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
579 _execInitNodes[i] = clonedNodes[0];
580 _execNodes[i] = clonedNodes[1];
582 _execFinalizeNodes[i] = clonedNodes[2];
583 DEBTRACE( "-------------- 4" );
584 prepareInputsFromOutOfScope(i);
585 DEBTRACE( "-------------- 5" );
586 createOutputOutOfScopeInterceptors(i);
587 DEBTRACE( "-------------- 6" );
589 for(i=0;i<nbOfBr;i++)
591 DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
592 _execIds[i]=_execCurrentId;
593 int posInAbs(_execCurrentId);
595 posInAbs=_passedData->toAbsId(_execCurrentId);
596 _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
598 DEBTRACE( "-------------- 7" );
602 _passedData->checkLevel2(_execOutGoingPorts[0]);
603 _passedData->assignAlreadyDone(_execVals);
605 // clean inputs data coming from the outside in _node
606 set< InPort * > portsToSetVals=getAllInPortsComingFromOutsideOfCurrentScope();
607 for(auto iter : portsToSetVals)
609 InputPort *curPortCasted=(InputPort *) iter;//Cast granted by ForEachLoopGen::buildDelegateOf(InPort)
610 if(!curPortCasted->canSafelySqueezeMemory())// this can appear strange ! if not safelySqueeze -> release. Nevertheless it is true.
611 curPortCasted->releaseData(); // these input ports have been incremented with InputPort::put into DynParaLoop::prepareInputsFromOutOfScope. So they can be released now.
614 catch(YACS::Exception& ex)
616 //ForEachLoop must be put in error and the exception rethrown to notify the caller
617 DEBTRACE( "ForEachLoopGen::exUpdateState: " << ex.what() );
618 setState(YACS::ERROR);
619 setErrorDetails(ex.what());
624 setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
627 for(i=0;i<nbOfBr;i++)
630 _execInitNodes[i]->exUpdateState();
631 _initializingCounter++;
636 _execNodes[i]->exUpdateState();
639 forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
643 void ForEachLoopGen::exUpdateProgress()
645 // emit notification to all observers registered with the dispatcher on any change of the node's state
646 sendEvent("progress");
649 void ForEachLoopGen::getReadyTasks(std::vector<Task *>& tasks)
653 if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
654 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
656 if(_nodeForSpecialCases)
658 _nodeForSpecialCases->getReadyTasks(tasks);
661 vector<Node *>::iterator iter;
662 for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
663 (*iter)->getReadyTasks(tasks);
664 for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
665 (*iter)->getReadyTasks(tasks);
666 for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
667 (*iter)->getReadyTasks(tasks);
671 int ForEachLoopGen::getNumberOfInputPorts() const
673 return DynParaLoop::getNumberOfInputPorts()+1;
676 void ForEachLoopGen::checkNoCyclePassingThrough(Node *node)
681 void ForEachLoopGen::selectRunnableTasks(std::vector<Task *>& tasks)
685 std::list<InputPort *> ForEachLoopGen::getSetOfInputPort() const
687 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
688 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
692 std::list<InputPort *> ForEachLoopGen::getLocalInputPorts() const
694 list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
695 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
699 InputPort *ForEachLoopGen::getInputPort(const std::string& name) const
701 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
702 return (InputPort *)&_splitterNode._dataPortToDispatch;
704 return DynParaLoop::getInputPort(name);
707 OutputPort *ForEachLoopGen::getOutputPort(const std::string& name) const
709 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
711 if(name==(*iter)->getName())
712 return (OutputPort *)(*iter);
714 return DynParaLoop::getOutputPort(name);
717 OutPort *ForEachLoopGen::getOutPort(const std::string& name) const
719 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
721 if(name==(*iter)->getName())
722 return (OutPort *)(*iter);
724 return DynParaLoop::getOutPort(name);
727 Node *ForEachLoopGen::getChildByShortName(const std::string& name) const
729 if(name==NAME_OF_SPLITTERNODE)
730 return (Node *)&_splitterNode;
732 return DynParaLoop::getChildByShortName(name);
735 //! Method used to notify the node that a child node has finished
737 * Update the current state and return the change state
739 * \param node : the child node that has finished
740 * \return the state change
742 YACS::Event ForEachLoopGen::updateStateOnFinishedEventFrom(Node *node)
744 DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
746 switch(getIdentityOfNotifyerNode(node,id))
749 return updateStateForInitNodeOnFinishedEventFrom(node,id);
751 return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
753 return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
757 return YACS::NOEVENT;
760 YACS::Event ForEachLoopGen::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
762 _execNodes[id]->exUpdateState();
764 _initializingCounter--;
766 if (_initializingCounter == 0)
767 _initNode->setState(DONE);
768 return YACS::NOEVENT;
772 * \param [in] isNormalFinish - if true
774 YACS::Event ForEachLoopGen::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
780 int globalId(_execIds[id]);
782 globalId=_passedData->toAbsId(globalId);
783 sendEvent2("progress_ok",&globalId);
784 storeOutValsInSeqForOutOfScopeUse(globalId,id);
788 int globalId(_execIds[id]);
790 globalId=_passedData->toAbsId(globalId);
791 sendEvent2("progress_ko",&globalId);
794 if(_execCurrentId==getFinishedId())
795 {//No more elements of _dataPortToDispatch to treat
796 _execIds[id]=NOT_RUNNING_BRANCH_ID;
797 //analyzing if some samples are still on treatment on other branches.
798 bool isFinished(true);
799 for(int i=0;i<_execIds.size() && isFinished;i++)
800 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
805 if(_failedCounter!=0)
806 {// case of keepgoing mode + a failed
807 std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
808 DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom : "<< oss.str());
809 setState(YACS::FAILED);
812 pushAllSequenceValues();
816 _node->setState(YACS::DONE);
818 ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
821 std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
822 std::list<Node *>::iterator iter=aChldn.begin();
823 for(;iter!=aChldn.end();iter++)
824 (*iter)->setState(YACS::DONE);
828 if (_finalizeNode == NULL)
830 // No finalize node, we just finish the loop at the end of exec nodes execution
831 setState(YACS::DONE);
836 // Run the finalize nodes, the loop will be done only when they all finish
837 _unfinishedCounter = 0; // This counter indicates how many branches are not finished
838 for (int i=0 ; i<_execIds.size() ; i++)
840 YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
841 DEBTRACE("Launching finalize node for branch " << i);
842 _execFinalizeNodes[i]->exUpdateState();
843 _unfinishedCounter++;
845 return YACS::NOEVENT;
848 catch(YACS::Exception& ex)
850 DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom: "<<ex.what());
851 //no way to push results : put following nodes in FAILED state
852 //TODO could be more fine grain : put only concerned nodes in FAILED state
854 setState(YACS::ERROR);
859 else if(_state == YACS::ACTIVATED)
860 {//more elements to do and loop still activated
861 _execIds[id]=_execCurrentId;
862 int posInAbs(_execCurrentId);
864 posInAbs=_passedData->toAbsId(_execCurrentId);
865 _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
866 //forwardExecStateToOriginalBody(node);
869 node->exUpdateState();
870 //forwardExecStateToOriginalBody(node);
874 {//elements to process and loop no more activated
875 DEBTRACE("foreach loop state " << _state);
877 return YACS::NOEVENT;
880 YACS::Event ForEachLoopGen::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
882 DEBTRACE("Finalize node finished on branch " << id);
883 _unfinishedCounter--;
886 DEBTRACE(_unfinishedCounter << " finalize nodes still running");
887 if (_unfinishedCounter == 0)
889 _finalizeNode->setState(YACS::DONE);
890 setState(YACS::DONE);
894 return YACS::NOEVENT;
897 YACS::Event ForEachLoopGen::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
900 DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
901 // TODO: deal with keepgoing without the dependency to Executor
902 if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
903 return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
907 return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
911 void ForEachLoopGen::InterceptorizeNameOfPort(std::string& portName)
913 std::replace_if(portName.begin(), portName.end(), std::bind(std::equal_to<char>(), '.',std::placeholders::_1), '_');
914 portName += INTERCEPTOR_STR;
917 void ForEachLoopGen::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
919 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
920 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
921 if(typeOfPortInstance==OutputPort::NAME)
923 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
925 for(;iter!=_outGoingPorts.end();iter++,i++)
926 if((*iter)->getRepr()==port.first || *iter==port.first)
928 if(iter!=_outGoingPorts.end())
930 if(*iter!=port.first)
933 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
939 TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
940 TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
941 // The out going ports belong to the ForEachLoop, whereas
942 // the delegated port belongs to a node child of the ForEachLoop.
943 // The name of the delegated port contains dots (bloc.node.outport),
944 // whereas the name of the out going port shouldn't do.
945 std::string outputPortName(getPortName(port.first));
946 InterceptorizeNameOfPort(outputPortName);
947 AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
948 InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
949 intercptor->setRepr(newPort);
951 newPort->addRepr(port.first,intercptor);
952 _outGoingPorts.push_back(newPort);
953 _intecptrsForOutGoingPorts.push_back(intercptor);
958 throw Exception("ForEachLoopGen::buildDelegateOf : not implemented for DS because not specified");
961 void ForEachLoopGen::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
963 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
964 if(typeOfPortInstance==OutputPort::NAME)
966 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
967 for(;iter!=_outGoingPorts.end();iter++)
968 if((*iter)->getRepr()==port.first)
970 if(iter==_outGoingPorts.end())
972 string what("ForEachLoopGen::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
973 throw Exception(what);
979 throw Exception("ForEachLoopGen::getDelegateOf : not implemented because not specified");
982 void ForEachLoopGen::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
984 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
985 if(typeOfPortInstance==OutputPort::NAME)
987 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
988 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
989 for(;iter!=_outGoingPorts.end();iter++,iter2++)
990 if((*iter)->getRepr()==portDwn)
992 //ASSERT(portUp==*iter.second)
993 if((*iter)->decrRef())
995 AnySplitOutputPort *p=*iter;
996 _outGoingPorts.erase(iter);
998 InterceptorInputPort *ip=*iter2;
999 _intecptrsForOutGoingPorts.erase(iter2);
1005 OutPort *ForEachLoopGen::getDynOutPortByAbsName(int branchNb, const std::string& name)
1007 string portName, nodeName;
1008 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
1009 Node *staticChild = getChildByName(nodeName);
1010 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoopGen::buildDelegateOf)
1011 //that a link starting from _initNode goes out of scope of 'this'.
1014 void ForEachLoopGen::cleanDynGraph()
1016 DynParaLoop::cleanDynGraph();
1017 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
1018 (*iter3)->decrRef();
1020 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
1021 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
1023 _execOutGoingPorts.clear();
1026 void ForEachLoopGen::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
1028 vector<AnyInputPort *>::iterator iter;
1030 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
1032 Any *val=(Any *)(*iter)->getValue();
1033 _execVals[i]->setEltAtRank(rank,val);
1037 int ForEachLoopGen::getFinishedId()
1040 return _splitterNode.getNumberOfElements();
1042 return _passedData->getNumberOfElementsToDo();
1045 void ForEachLoopGen::prepareSequenceValues(int sizeOfSamples)
1047 _execVals.resize(_outGoingPorts.size());
1048 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1049 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1050 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1053 void ForEachLoopGen::pushAllSequenceValues()
1055 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1057 for(;iter!=_outGoingPorts.end();iter++,i++)
1058 (*iter)->put((const void *)_execVals[i]);
1061 void ForEachLoopGen::createOutputOutOfScopeInterceptors(int branchNb)
1063 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1065 for(;iter!=_outGoingPorts.end();iter++,i++)
1067 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1068 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1069 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1070 DEBTRACE( portOut->getName() );
1071 TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
1072 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
1073 portOut->addInPort(interceptor);
1074 _execOutGoingPorts[branchNb].push_back(interceptor);
1078 void ForEachLoopGen::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1079 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd)
1081 DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
1082 if(end->getNode() == &_splitterNode)
1083 throw Exception("Illegal link within a foreach loop: \
1084 the 'SmplsCollection' port cannot be linked within the scope of the loop.");
1085 if(end == _nbOfBranches->getPort())
1086 throw Exception("Illegal link within a foreach loop: \
1087 the 'nbBranches' port cannot be linked within the scope of the loop.");
1090 std::list<OutputPort *> ForEachLoopGen::getLocalOutputPorts() const
1092 list<OutputPort *> ret;
1093 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
1097 //! Dump the node state to a stream
1099 * \param os : the output stream
1101 void ForEachLoopGen::writeDot(std::ostream &os) const
1103 os << " subgraph cluster_" << getId() << " {\n" ;
1104 //only one node in a loop
1107 _node->writeDot(os);
1108 os << getId() << " -> " << _node->getId() << ";\n";
1111 os << getId() << "[fillcolor=\"" ;
1112 YACS::StatesForNode state=getEffectiveState();
1113 os << getColorState(state);
1114 os << "\" label=\"" << "Loop:" ;
1115 os << getName() <<"\"];\n";
1118 //! Reset the state of the node and its children depending on the parameter level
1119 void ForEachLoopGen::resetState(int level)
1122 DynParaLoop::resetState(level);
1127 std::string ForEachLoopGen::getProgress() const
1129 int nbElems(getNbOfElementsToBeProcessed());
1130 std::stringstream aProgress;
1132 aProgress << _currentIndex << "/" << nbElems;
1135 return aProgress.str();
1138 //! Get the progress weight for all elementary nodes
1140 * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
1141 * by the number of elements done and the weight total by the number total of elements
1143 list<ProgressWeight> ForEachLoopGen::getProgressWeight() const
1145 list<ProgressWeight> ret;
1146 list<Node *> setOfNode=edGetDirectDescendants();
1147 int elemDone=getCurrentIndex();
1148 int elemTotal=getNbOfElementsToBeProcessed();
1149 for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
1151 list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
1152 for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
1154 (*iter).weightDone=((*iter).weightTotal) * elemDone;
1155 (*iter).weightTotal*=elemTotal;
1157 ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
1162 int ForEachLoopGen::getNbOfElementsToBeProcessed() const
1164 int nbOfElems(_splitterNode.getNumberOfElements());
1165 int nbBranches = _nbOfBranches->getNumberOfBranches(nbOfElems);
1167 + (_initNode ? nbBranches:0)
1168 + (_finalizeNode ? nbBranches:0) ;
1172 * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
1173 * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
1174 * This method has one input \a execut and 3 outputs.
1176 * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
1177 * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
1178 * All of the items in \a outputs have the same size.
1179 * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
1180 * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
1181 * SequenceAny objects contained in \a outputs.
1183 * \sa edGetSeqOfSamplesPort
1185 std::vector<unsigned int> ForEachLoopGen::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1187 YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
1188 if(_execVals.empty())
1189 return std::vector<unsigned int>();
1190 if(_execOutGoingPorts.empty())
1191 return std::vector<unsigned int>();
1192 std::size_t sz(_execVals.size());
1194 nameOfOutputs.resize(sz);
1195 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1196 for(std::size_t i=0;i<sz;i++)
1198 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1199 nameOfOutputs[i]=ports[i]->getName();
1201 return _execVals[0]->getSetItems();
1205 * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
1206 * getPassedResults method.
1208 void ForEachLoopGen::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1212 _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1215 int ForEachLoopGen::getFEDeltaBetween(OutPort *start, InPort *end)
1217 Node *ns(start->getNode()),*ne(end->getNode());
1218 ComposedNode *co(getLowestCommonAncestor(ns,ne));
1223 ForEachLoopGen *isFE(dynamic_cast<ForEachLoopGen *>(work));
1226 work=work->getFather();
1228 if(dynamic_cast<AnySplitOutputPort *>(start))
1234 * This method is used to obtain the values already processed by the ForEachLoop.
1235 * A new ForEachLoopPassedData object is returned. You have to delete it.
1237 ForEachLoopPassedData* ForEachLoopGen::getProcessedData()const
1239 std::vector<SequenceAny *> outputs;
1240 std::vector<std::string> nameOfOutputs;
1241 if(_execVals.empty() || _execOutGoingPorts.empty())
1242 return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
1243 std::size_t sz(_execVals.size());
1245 nameOfOutputs.resize(sz);
1246 const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1247 for(std::size_t i=0;i<sz;i++)
1249 outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1250 nameOfOutputs[i]=ports[i]->getName();
1252 return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
1255 void ForEachLoopGen::setProcessedData(ForEachLoopPassedData* processedData)
1259 _passedData = processedData;
1263 * \param portName : "interceptorized" name of port.
1265 const YACS::ENGINE::TypeCode* ForEachLoopGen::getOutputPortType(const std::string& portName)const
1267 const YACS::ENGINE::TypeCode* ret=NULL;
1268 vector<AnySplitOutputPort *>::const_iterator it;
1269 for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
1271 std::string originalPortName(getPortName(*it));
1272 //InterceptorizeNameOfPort(originalPortName);
1273 DEBTRACE("ForEachLoopGen::getOutputPortType compare " << portName << " == " << originalPortName);
1274 if(originalPortName == portName)
1276 ret = (*it)->edGetType()->contentType();
1282 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
1284 return new ForEachLoop(*this,father,editionOnly);
1287 void ForEachLoop::accept(Visitor *visitor)
1289 visitor->visitForEachLoop(this);
1292 void ForEachLoopDyn::accept(Visitor *visitor)
1294 visitor->visitForEachLoopDyn(this);
1297 Node *ForEachLoopDyn::simpleClone(ComposedNode *father, bool editionOnly) const
1299 return new ForEachLoopDyn(*this,father,editionOnly);