1 #include "ForEachLoop.hxx"
7 #include "YacsTrace.hxx"
9 using namespace YACS::ENGINE;
12 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
14 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
16 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
18 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
20 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
21 DataPort(name,node,type),Port(node),
26 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
27 Port(other,newHelder),
32 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
34 set<InPort *> ports=_repr->edSetInPort();
35 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
36 (*iter)->getAllRepresentants(repr);
39 InputPort *InterceptorInputPort::clone(Node *newHelder) const
41 return new InterceptorInputPort(*this,newHelder);
44 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
49 bool AnySplitOutputPort::decrRef()
54 void AnySplitOutputPort::incrRef() const
59 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
60 DataPort(name,node,type),Port(node),
61 _repr(0),_intercptr(0),_cnt(1)
65 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
66 DataPort(other,newHelder),
67 Port(other,newHelder),
68 _repr(0),_intercptr(0),_cnt(1)
72 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(Exception)
74 bool ret=OutputPort::addInPort(inPort);
76 _repr->addInPort(_intercptr);
80 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
83 OutPort::getAllRepresented(represented);
85 _repr->getAllRepresented(represented);
88 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(Exception)
90 bool ret=OutputPort::removeInPort(inPort,forward);
92 _repr->removeInPort(_intercptr,forward);
96 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
102 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
104 return new AnySplitOutputPort(*this,newHelder);
107 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
112 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
116 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
118 return new SeqAnyInputPort(*this,newHelder);
121 unsigned SeqAnyInputPort::getNumberOfElements() const
123 const SequenceAny * valCsted=(const SequenceAny *) _value;
124 return valCsted->size();
127 Any *SeqAnyInputPort::getValueAtRank(int i) const
129 const SequenceAny * valCsted=(const SequenceAny *) _value;
130 AnyPtr ret=(*valCsted)[i];
135 std::string SeqAnyInputPort::dump()
137 stringstream xmldump;
138 int nbElem = getNumberOfElements();
139 xmldump << "<value><array><data>" << endl;
140 for (int i = 0; i < nbElem; i++)
142 Any *val = getValueAtRank(i);
143 switch (val->getType()->kind())
146 xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
149 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
152 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
155 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
158 xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
161 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
165 xmldump << "</data></array></value>" << endl;
166 return xmldump.str();
169 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
170 ForEachLoop *father):ElementaryNode(name),
171 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
172 this,new TypeCodeSeq("","",typeOfData))
177 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
178 _dataPortToDispatch(other._dataPortToDispatch,this)
182 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(Exception)
184 if(name==NAME_OF_SEQUENCE_INPUT)
185 return (InputPort *)&_dataPortToDispatch;
187 return ElementaryNode::getInputPort(name);
190 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
192 return new SplitterNode(*this,(ForEachLoop *)father);
195 unsigned SplitterNode::getNumberOfElements() const
197 return _dataPortToDispatch.getNumberOfElements();
200 void SplitterNode::execute()
202 //Nothing : should never been called elsewhere big problem...
205 void SplitterNode::init(bool start)
207 ElementaryNode::init(start);
208 _dataPortToDispatch.exInit(start);
211 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
213 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
214 ForEachLoop *fatherTyped=(ForEachLoop *)_father;
215 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
216 valueToDispatch->decrRef();
219 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
221 _normalFinish(normalFinish)
223 _state=YACS::TOACTIVATE;
224 _father=_loop->getFather();
227 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
232 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
234 return new FakeNodeForForEachLoop(*this);
237 void FakeNodeForForEachLoop::exForwardFailed()
239 _loop->exForwardFailed();
240 FakeNodeForForEachLoop *normallyThis=_loop->_nodeForSpecialCases;
241 _loop->_nodeForSpecialCases=0;
245 void FakeNodeForForEachLoop::exForwardFinished()
247 _loop->exForwardFinished();
248 FakeNodeForForEachLoop *normallyThis=_loop->_nodeForSpecialCases;
249 _loop->_nodeForSpecialCases=0;
253 void FakeNodeForForEachLoop::execute()
256 throw Exception("");//only to trigger ABORT on Executor
258 _loop->pushAllSequenceValues();
261 void FakeNodeForForEachLoop::aborted()
263 _loop->setState(YACS::ERROR);
266 void FakeNodeForForEachLoop::finished()
268 _loop->setState(YACS::DONE);
271 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
272 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
273 _execCurrentId(0),_nodeForSpecialCases(0)
277 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
278 _splitterNode(other._splitterNode,this),
279 _execCurrentId(0),_nodeForSpecialCases(0)
283 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
285 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
286 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
287 temp->addRepr(getOutPort((*iter2)->getName()),interc);
288 interc->setRepr(temp);
289 _outGoingPorts.push_back(temp);
290 _intecptrsForOutGoingPorts.push_back(interc);
294 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
296 return new ForEachLoop(*this,father,editionOnly);
299 ForEachLoop::~ForEachLoop()
302 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
304 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
308 void ForEachLoop::init(bool start)
310 DynParaLoop::init(start);
311 _splitterNode.init(start);
316 void ForEachLoop::exUpdateState()
318 if(_state == YACS::DISABLED)
320 if(_inGate.exIsReady())
322 //setState(YACS::TOACTIVATE); // call this method below
323 //internal graph update
325 int nbOfBr=_nbOfBranches.getIntValue();
326 int nbOfElts=_splitterNode.getNumberOfElements();
329 prepareSequenceValues(0);
330 delete _nodeForSpecialCases;
331 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
336 delete _nodeForSpecialCases;
337 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
342 _execNodes.resize(nbOfBr);
343 _execIds.resize(nbOfBr);
344 _execOutGoingPorts.resize(nbOfBr);
345 prepareSequenceValues(nbOfElts);
347 _execInitNodes.resize(nbOfBr);
349 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
350 //so catch them to control errors
353 for(i=0;i<nbOfBr;i++)
355 DEBTRACE( "-------------- 1" );
356 _execIds[i]=_execCurrentId;
357 DEBTRACE( "-------------- 2" );
358 _execNodes[i]=_node->clone(this,false);
359 DEBTRACE( "-------------- 3" );
361 _execInitNodes[i]=_initNode->clone(this,false);
362 DEBTRACE( "-------------- 4" );
363 prepareInputsFromOutOfScope(i);
364 DEBTRACE( "-------------- 5" );
365 createOutputOutOfScopeInterceptors(i);
366 DEBTRACE( "-------------- 6" );
367 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
368 DEBTRACE( "-------------- 7" );
371 catch(YACS::Exception& ex)
373 //ForEachLoop must be put in error and the exception rethrown to notify the caller
374 DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
375 setState(YACS::ERROR);
380 setState(YACS::TOACTIVATE); // move the calling of setState method there for adding observers for clone nodes in GUI part
383 for(i=0;i<nbOfBr;i++)
385 _execInitNodes[i]->exUpdateState();
389 _execNodes[i]->exUpdateState();
394 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
398 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
400 if(_nodeForSpecialCases)
402 _nodeForSpecialCases->getReadyTasks(tasks);
405 for(vector<Node *>::iterator iter=_execNodes.begin();iter!=_execNodes.end();iter++)
406 (*iter)->getReadyTasks(tasks);
407 for(vector<Node *>::iterator iter2=_execInitNodes.begin();iter2!=_execInitNodes.end();iter2++)
408 (*iter2)->getReadyTasks(tasks);
412 int ForEachLoop::getNumberOfInputPorts() const
414 return DynParaLoop::getNumberOfInputPorts()+1;
417 void ForEachLoop::checkConsistency(ComposedNode *pointOfView) const throw(Exception)
422 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(Exception)
427 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
431 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
433 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
434 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
438 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(Exception)
440 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
441 return (InputPort *)&_splitterNode._dataPortToDispatch;
443 return DynParaLoop::getInputPort(name);
446 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(Exception)
448 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
450 if(name==(*iter)->getName())
451 return (OutputPort *)(*iter);
453 return DynParaLoop::getOutputPort(name);
456 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(Exception)
458 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
460 if(name==(*iter)->getName())
461 return (OutPort *)(*iter);
463 return DynParaLoop::getOutPort(name);
466 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(Exception)
468 if(name==NAME_OF_SPLITTERNODE)
469 return (Node *)&_splitterNode;
471 return DynParaLoop::getChildByShortName(name);
474 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
477 switch(getIdentityOfNotifyerNode(node,id))
480 _execNodes[id]->exUpdateState();
484 storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
485 if(_execCurrentId==_splitterNode.getNumberOfElements())
486 {//No more elements of _dataPortToDispatch to treat
487 _execIds[id]=NOT_RUNNING_BRANCH_ID;
488 //analyzing if some samples are still on treatment on other branches.
489 bool isFinished=true;
490 for(int i=0;i<_execIds.size() and isFinished;i++)
491 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
496 pushAllSequenceValues();
497 setState(YACS::DONE);
500 catch(YACS::Exception& ex)
502 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
503 //no way to push results : put following nodes in FAILED state
504 //TODO could be more fine grain : put only concerned nodes in FAILED state
506 setState(YACS::ERROR);
512 {//more elements to do
513 _execIds[id]=_execCurrentId;
515 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
516 node->exUpdateState();
521 return YACS::NOEVENT;
524 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::set<ComposedNode *>& pointsOfView)
526 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
527 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
528 if(typeOfPortInstance==OutputPort::NAME)
530 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
532 for(;iter!=_outGoingPorts.end();iter++,i++)
533 if((*iter)->getRepr()==port.first)
535 if(iter!=_outGoingPorts.end())
538 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
543 TypeCodeSeq *newTc=new TypeCodeSeq("","",port.first->edGetType());
544 AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
545 InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
546 intercptor->setRepr(newPort);
548 newPort->addRepr(port.first,intercptor);
549 _outGoingPorts.push_back(newPort);
550 _intecptrsForOutGoingPorts.push_back(intercptor);
555 throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
558 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::set<ComposedNode *>& pointsOfView) throw(Exception)
560 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
561 if(typeOfPortInstance==OutputPort::NAME)
563 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
564 for(;iter!=_outGoingPorts.end();iter++)
565 if((*iter)->getRepr()==port.first)
567 if(iter==_outGoingPorts.end())
569 string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
570 throw Exception(what);
576 throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
579 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::set<ComposedNode *>& pointsOfView) throw(Exception)
581 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
582 if(typeOfPortInstance==OutputPort::NAME)
584 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
585 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
586 for(;iter!=_outGoingPorts.end();iter++,iter2++)
587 if((*iter)->getRepr()==portDwn)
589 //ASSERT(portUp==*iter.second)
590 if((*iter)->decrRef())
592 _outGoingPorts.erase(iter);
594 _intecptrsForOutGoingPorts.erase(iter2);
600 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
602 string portName, nodeName;
603 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
604 Node *staticChild = getChildByName(nodeName);
605 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
606 //that a link starting from _initNode goes out of scope of 'this'.
609 void ForEachLoop::cleanDynGraph()
611 DynParaLoop::cleanDynGraph();
612 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
615 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
616 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
618 _execOutGoingPorts.clear();
621 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
623 vector<AnyInputPort *>::iterator iter;
625 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
627 Any *val=(Any *)(*iter)->getValue();
628 _execVals[i]->setEltAtRank(rank,val);
632 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
634 _execVals.resize(_outGoingPorts.size());
635 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
636 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
637 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
640 void ForEachLoop::pushAllSequenceValues()
642 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
644 for(;iter!=_outGoingPorts.end();iter++,i++)
645 (*iter)->put((const void *)_execVals[i]);
648 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
650 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
652 for(;iter!=_outGoingPorts.end();iter++,i++)
654 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
655 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
656 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
657 DEBTRACE( portOut->getName() );
658 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
659 portOut->addInPort(interceptor);
660 _execOutGoingPorts[branchNb].push_back(interceptor);
664 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::set<ComposedNode *>& pointsOfViewStart,
665 InPort *end, const std::set<ComposedNode *>& pointsOfViewEnd) throw(Exception)
667 if(isInMyDescendance(start->getNode())==_node)
668 throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
671 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
673 list<OutputPort *> ret;
674 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); // OCC : mkr : add _splittedPort to the list of output ports
675 //ret.push_back(getOutputPort(SplitterNode::NAME_OF_SEQUENCE_INPUT));
679 void ForEachLoop::accept(Visitor *visitor)
681 visitor->visitForEachLoop(this);
684 //! Dump the node state to a stream
686 * \param os : the output stream
688 void ForEachLoop::writeDot(std::ostream &os)
690 os << " subgraph cluster_" << getId() << " {\n" ;
691 //only one node in a loop
693 os << getId() << " -> " << _node->getId() << ";\n";
695 os << getId() << "[fillcolor=\"" ;
696 YACS::StatesForNode state=getEffectiveState();
697 os << getColorState(state);
698 os << "\" label=\"" << "Loop:" ;
699 os << getName() <<"\"];\n";