1 #include "ForEachLoop.hxx"
2 #include "TypeCode.hxx"
8 #include "YacsTrace.hxx"
10 using namespace YACS::ENGINE;
13 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
15 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
17 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
19 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
21 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
22 DataPort(name,node,type),Port(node),
27 InterceptorInputPort::InterceptorInputPort(const InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
28 Port(other,newHelder),
33 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
35 set<InPort *> ports=_repr->edSetInPort();
36 for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
37 (*iter)->getAllRepresentants(repr);
40 InputPort *InterceptorInputPort::clone(Node *newHelder) const
42 return new InterceptorInputPort(*this,newHelder);
45 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
50 bool AnySplitOutputPort::decrRef()
55 void AnySplitOutputPort::incrRef() const
60 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
61 DataPort(name,node,type),Port(node),
62 _repr(0),_intercptr(0),_cnt(1)
66 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
67 DataPort(other,newHelder),
68 Port(other,newHelder),
69 _repr(0),_intercptr(0),_cnt(1)
73 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(Exception)
75 bool ret=OutputPort::addInPort(inPort);
77 _repr->addInPort(_intercptr);
81 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
84 OutPort::getAllRepresented(represented);
86 _repr->getAllRepresented(represented);
89 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(Exception)
91 bool ret=OutputPort::removeInPort(inPort,forward);
93 if(_setOfInputPort.empty())
94 _repr->removeInPort(_intercptr,forward);
98 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
101 _intercptr=intercptr;
104 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
106 return new AnySplitOutputPort(*this,newHelder);
109 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
114 SeqAnyInputPort::SeqAnyInputPort(const SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
118 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
120 return new SeqAnyInputPort(*this,newHelder);
123 unsigned SeqAnyInputPort::getNumberOfElements() const
125 const SequenceAny * valCsted=(const SequenceAny *) _value;
126 return valCsted->size();
129 Any *SeqAnyInputPort::getValueAtRank(int i) const
131 const SequenceAny * valCsted=(const SequenceAny *) _value;
132 AnyPtr ret=(*valCsted)[i];
137 std::string SeqAnyInputPort::dump()
139 stringstream xmldump;
140 int nbElem = getNumberOfElements();
141 xmldump << "<value><array><data>" << endl;
142 for (int i = 0; i < nbElem; i++)
144 Any *val = getValueAtRank(i);
145 switch (val->getType()->kind())
148 xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
151 xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
154 xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
157 xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
160 xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
163 xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
167 xmldump << "</data></array></value>" << endl;
168 return xmldump.str();
171 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
172 ForEachLoop *father):ElementaryNode(name),
173 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
174 this,new TypeCodeSeq("","",typeOfData))
179 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
180 _dataPortToDispatch(other._dataPortToDispatch,this)
184 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(Exception)
186 if(name==NAME_OF_SEQUENCE_INPUT)
187 return (InputPort *)&_dataPortToDispatch;
189 return ElementaryNode::getInputPort(name);
192 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
194 return new SplitterNode(*this,(ForEachLoop *)father);
197 unsigned SplitterNode::getNumberOfElements() const
199 return _dataPortToDispatch.getNumberOfElements();
202 void SplitterNode::execute()
204 //Nothing : should never been called elsewhere big problem...
207 void SplitterNode::init(bool start)
209 ElementaryNode::init(start);
210 _dataPortToDispatch.exInit(start);
213 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
215 Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
216 ForEachLoop *fatherTyped=(ForEachLoop *)_father;
217 fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
218 valueToDispatch->decrRef();
221 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
223 _normalFinish(normalFinish)
225 _state=YACS::TOACTIVATE;
226 _father=_loop->getFather();
229 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
234 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
236 return new FakeNodeForForEachLoop(*this);
239 void FakeNodeForForEachLoop::exForwardFailed()
241 _loop->exForwardFailed();
242 FakeNodeForForEachLoop *normallyThis=_loop->_nodeForSpecialCases;
243 _loop->_nodeForSpecialCases=0;
247 void FakeNodeForForEachLoop::exForwardFinished()
249 _loop->exForwardFinished();
250 FakeNodeForForEachLoop *normallyThis=_loop->_nodeForSpecialCases;
251 _loop->_nodeForSpecialCases=0;
255 void FakeNodeForForEachLoop::execute()
258 throw Exception("");//only to trigger ABORT on Executor
260 _loop->pushAllSequenceValues();
263 void FakeNodeForForEachLoop::aborted()
265 _loop->setState(YACS::ERROR);
268 void FakeNodeForForEachLoop::finished()
270 _loop->setState(YACS::DONE);
273 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
274 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
275 _execCurrentId(0),_nodeForSpecialCases(0)
279 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
280 _splitterNode(other._splitterNode,this),
281 _execCurrentId(0),_nodeForSpecialCases(0)
285 for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
287 AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
288 InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
289 temp->addRepr(getOutPort((*iter2)->getName()),interc);
290 interc->setRepr(temp);
291 _outGoingPorts.push_back(temp);
292 _intecptrsForOutGoingPorts.push_back(interc);
296 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
298 return new ForEachLoop(*this,father,editionOnly);
301 ForEachLoop::~ForEachLoop()
304 for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
306 for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
310 void ForEachLoop::init(bool start)
312 DynParaLoop::init(start);
313 _splitterNode.init(start);
318 void ForEachLoop::exUpdateState()
320 if(_state == YACS::DISABLED)
322 if(_inGate.exIsReady())
324 //setState(YACS::TOACTIVATE); // call this method below
325 //internal graph update
327 int nbOfBr=_nbOfBranches.getIntValue();
328 int nbOfElts=_splitterNode.getNumberOfElements();
331 prepareSequenceValues(0);
332 delete _nodeForSpecialCases;
333 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
338 delete _nodeForSpecialCases;
339 _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
344 _execNodes.resize(nbOfBr);
345 _execIds.resize(nbOfBr);
346 _execOutGoingPorts.resize(nbOfBr);
347 prepareSequenceValues(nbOfElts);
349 _execInitNodes.resize(nbOfBr);
351 //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors
352 //so catch them to control errors
355 for(i=0;i<nbOfBr;i++)
357 DEBTRACE( "-------------- 1" );
358 _execIds[i]=_execCurrentId;
359 DEBTRACE( "-------------- 2" );
360 _execNodes[i]=_node->clone(this,false);
361 DEBTRACE( "-------------- 3" );
363 _execInitNodes[i]=_initNode->clone(this,false);
364 DEBTRACE( "-------------- 4" );
365 prepareInputsFromOutOfScope(i);
366 DEBTRACE( "-------------- 5" );
367 createOutputOutOfScopeInterceptors(i);
368 DEBTRACE( "-------------- 6" );
369 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
370 DEBTRACE( "-------------- 7" );
373 catch(YACS::Exception& ex)
375 //ForEachLoop must be put in error and the exception rethrown to notify the caller
376 DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
377 setState(YACS::ERROR);
382 setState(YACS::TOACTIVATE); // move the calling of setState method there for adding observers for clone nodes in GUI part
385 for(i=0;i<nbOfBr;i++)
387 _execInitNodes[i]->exUpdateState();
391 _execNodes[i]->exUpdateState();
396 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
400 if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
402 if(_nodeForSpecialCases)
404 _nodeForSpecialCases->getReadyTasks(tasks);
407 for(vector<Node *>::iterator iter=_execNodes.begin();iter!=_execNodes.end();iter++)
408 (*iter)->getReadyTasks(tasks);
409 for(vector<Node *>::iterator iter2=_execInitNodes.begin();iter2!=_execInitNodes.end();iter2++)
410 (*iter2)->getReadyTasks(tasks);
414 int ForEachLoop::getNumberOfInputPorts() const
416 return DynParaLoop::getNumberOfInputPorts()+1;
419 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(Exception)
424 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
428 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
430 list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
431 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
435 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
437 list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
438 ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
442 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(Exception)
444 if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
445 return (InputPort *)&_splitterNode._dataPortToDispatch;
447 return DynParaLoop::getInputPort(name);
450 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(Exception)
452 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
454 if(name==(*iter)->getName())
455 return (OutputPort *)(*iter);
457 return DynParaLoop::getOutputPort(name);
460 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(Exception)
462 for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
464 if(name==(*iter)->getName())
465 return (OutPort *)(*iter);
467 return DynParaLoop::getOutPort(name);
470 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(Exception)
472 if(name==NAME_OF_SPLITTERNODE)
473 return (Node *)&_splitterNode;
475 return DynParaLoop::getChildByShortName(name);
478 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
481 switch(getIdentityOfNotifyerNode(node,id))
484 _execNodes[id]->exUpdateState();
488 storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
489 if(_execCurrentId==_splitterNode.getNumberOfElements())
490 {//No more elements of _dataPortToDispatch to treat
491 _execIds[id]=NOT_RUNNING_BRANCH_ID;
492 //analyzing if some samples are still on treatment on other branches.
493 bool isFinished=true;
494 for(int i=0;i<_execIds.size() and isFinished;i++)
495 isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
500 pushAllSequenceValues();
501 setState(YACS::DONE);
504 catch(YACS::Exception& ex)
506 DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
507 //no way to push results : put following nodes in FAILED state
508 //TODO could be more fine grain : put only concerned nodes in FAILED state
510 setState(YACS::ERROR);
516 {//more elements to do
517 _execIds[id]=_execCurrentId;
519 _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
520 node->exUpdateState();
525 return YACS::NOEVENT;
528 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
530 DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
531 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
532 if(typeOfPortInstance==OutputPort::NAME)
534 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
536 for(;iter!=_outGoingPorts.end();iter++,i++)
537 if((*iter)->getRepr()==port.first)
539 if(iter!=_outGoingPorts.end())
542 (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
547 TypeCodeSeq *newTc=new TypeCodeSeq("","",port.first->edGetType());
548 AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
549 InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
550 intercptor->setRepr(newPort);
552 newPort->addRepr(port.first,intercptor);
553 _outGoingPorts.push_back(newPort);
554 _intecptrsForOutGoingPorts.push_back(intercptor);
559 throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
562 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(Exception)
564 string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
565 if(typeOfPortInstance==OutputPort::NAME)
567 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
568 for(;iter!=_outGoingPorts.end();iter++)
569 if((*iter)->getRepr()==port.first)
571 if(iter==_outGoingPorts.end())
573 string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
574 throw Exception(what);
580 throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
583 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(Exception)
585 string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
586 if(typeOfPortInstance==OutputPort::NAME)
588 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
589 vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
590 for(;iter!=_outGoingPorts.end();iter++,iter2++)
591 if((*iter)->getRepr()==portDwn)
593 //ASSERT(portUp==*iter.second)
594 if((*iter)->decrRef())
596 _outGoingPorts.erase(iter);
598 _intecptrsForOutGoingPorts.erase(iter2);
604 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
606 string portName, nodeName;
607 splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
608 Node *staticChild = getChildByName(nodeName);
609 return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
610 //that a link starting from _initNode goes out of scope of 'this'.
613 void ForEachLoop::cleanDynGraph()
615 DynParaLoop::cleanDynGraph();
616 for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
619 for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
620 for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
622 _execOutGoingPorts.clear();
625 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
627 vector<AnyInputPort *>::iterator iter;
629 for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
631 Any *val=(Any *)(*iter)->getValue();
632 _execVals[i]->setEltAtRank(rank,val);
636 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
638 _execVals.resize(_outGoingPorts.size());
639 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
640 for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
641 _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
644 void ForEachLoop::pushAllSequenceValues()
646 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
648 for(;iter!=_outGoingPorts.end();iter++,i++)
649 (*iter)->put((const void *)_execVals[i]);
652 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
654 vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
656 for(;iter!=_outGoingPorts.end();iter++,i++)
658 DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
659 //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
660 OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
661 DEBTRACE( portOut->getName() );
662 AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
663 portOut->addInPort(interceptor);
664 _execOutGoingPorts[branchNb].push_back(interceptor);
668 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
669 InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(Exception)
671 if(isInMyDescendance(start->getNode())==_node)
672 throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
675 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
677 list<OutputPort *> ret;
678 ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); // OCC : mkr : add _splittedPort to the list of output ports
679 //ret.push_back(getOutputPort(SplitterNode::NAME_OF_SEQUENCE_INPUT));
683 void ForEachLoop::accept(Visitor *visitor)
685 visitor->visitForEachLoop(this);
688 //! Dump the node state to a stream
690 * \param os : the output stream
692 void ForEachLoop::writeDot(std::ostream &os) const
694 os << " subgraph cluster_" << getId() << " {\n" ;
695 //only one node in a loop
699 os << getId() << " -> " << _node->getId() << ";\n";
702 os << getId() << "[fillcolor=\"" ;
703 YACS::StatesForNode state=getEffectiveState();
704 os << getColorState(state);
705 os << "\" label=\"" << "Loop:" ;
706 os << getName() <<"\"];\n";