]> SALOME platform Git repositories - modules/yacs.git/blob - src/engine/ForEachLoop.cxx
Salome HOME
Retrieve state of ForEachLoop node even in case of failure.
[modules/yacs.git] / src / engine / ForEachLoop.cxx
1 // Copyright (C) 2006-2014  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26
27 #include <iostream>
28 #include <sstream>
29
30 //#define _DEVDEBUG_
31 #include "YacsTrace.hxx"
32
33 using namespace YACS::ENGINE;
34 using namespace std;
35
36 /*! \class YACS::ENGINE::ForEachLoop
37  *  \brief Loop node for parametric calculation
38  *
39  *  \ingroup Nodes
40  */
41
42 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
43
44 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
45
46 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
47
48 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
49
50 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
51                                                                                                 DataPort(name,node,type),Port(node),
52                                                                                                 _repr(0)
53 {
54 }
55
56 InterceptorInputPort::InterceptorInputPort(const  InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
57                                                                                                 Port(other,newHelder),
58                                                                                                 _repr(0)
59 {
60 }
61
62 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
63 {
64   set<InPort *> ports=_repr->edSetInPort();
65   for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
66     (*iter)->getAllRepresentants(repr);
67 }
68
69 InputPort *InterceptorInputPort::clone(Node *newHelder) const
70 {
71   return new InterceptorInputPort(*this,newHelder);
72 }
73
74 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
75 {
76   _repr=repr;
77 }
78
79 bool AnySplitOutputPort::decrRef()
80 {
81   return (--_cnt==0);
82 }
83
84 void AnySplitOutputPort::incrRef() const
85 {
86   _cnt++;
87 }
88
89 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
90                                                                                             DataPort(name,node,type),Port(node),
91                                                                                             _repr(0),_intercptr(0),_cnt(1)
92 {
93 }
94
95 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
96                                                                                          DataPort(other,newHelder),
97                                                                                          Port(other,newHelder),
98                                                                                          _repr(0),_intercptr(0),_cnt(1)
99 {
100 }
101
102 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
103 {
104   bool ret=OutputPort::addInPort(inPort);
105   if(_repr)
106     _repr->addInPort(_intercptr);
107   return ret;
108 }
109
110 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
111 {
112   if(!_repr)
113     OutPort::getAllRepresented(represented);
114   else
115     _repr->getAllRepresented(represented);
116 }
117
118 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
119 {
120   bool ret=OutputPort::removeInPort(inPort,forward);
121   if(_repr)
122     if(_setOfInputPort.empty())
123       _repr->removeInPort(_intercptr,forward);
124   return ret;
125 }
126
127 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
128 {
129   _repr=repr;
130   _intercptr=intercptr;
131 }
132
133 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
134 {
135   return new AnySplitOutputPort(*this,newHelder);
136 }
137
138 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
139 {
140   _type->decrRef();
141 }
142
143 SeqAnyInputPort::SeqAnyInputPort(const  SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
144 {
145 }
146
147 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
148 {
149   return new SeqAnyInputPort(*this,newHelder);
150 }
151
152 unsigned SeqAnyInputPort::getNumberOfElements() const
153 {
154   const SequenceAny * valCsted=(const SequenceAny *) _value;
155   if (valCsted) return valCsted->size();
156   return 0;
157 }
158
159 Any *SeqAnyInputPort::getValueAtRank(int i) const
160 {
161   const SequenceAny * valCsted=(const SequenceAny *) _value;
162   AnyPtr ret=(*valCsted)[i];
163   ret->incrRef();
164   return ret;
165 }
166
167 std::string SeqAnyInputPort::dump()
168 {
169   stringstream xmldump;
170   int nbElem = getNumberOfElements();
171   xmldump << "<value><array><data>" << endl;
172   for (int i = 0; i < nbElem; i++)
173     {
174       Any *val = getValueAtRank(i);
175       switch (val->getType()->kind())
176         {
177         case Double:
178           xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
179           break;
180         case Int:
181           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
182           break;
183         case Bool:
184           xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
185           break;
186         case String:
187           xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
188           break;
189         case Objref:
190           xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
191           break;
192         default:
193           xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
194           break;
195         }
196     }
197   xmldump << "</data></array></value>" << endl;
198   return xmldump.str();
199 }
200
201 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData, 
202                            ForEachLoop *father):ElementaryNode(name),
203                                                 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
204                                                                     this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
205 {
206   _father=father;
207 }
208
209 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
210                                                                            _dataPortToDispatch(other._dataPortToDispatch,this)
211 {
212 }
213
214 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
215 {
216   if(name==NAME_OF_SEQUENCE_INPUT)
217     return (InputPort *)&_dataPortToDispatch;
218   else
219     return ElementaryNode::getInputPort(name);
220 }
221
222 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
223 {
224   return new SplitterNode(*this,(ForEachLoop *)father);
225 }
226
227 unsigned SplitterNode::getNumberOfElements() const
228 {
229   return _dataPortToDispatch.getNumberOfElements();
230 }
231
232 void SplitterNode::execute()
233 {
234   //Nothing : should never been called elsewhere big problem...
235 }
236
237 void SplitterNode::init(bool start)
238 {
239   ElementaryNode::init(start);
240   _dataPortToDispatch.exInit(start);
241 }
242
243 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
244 {
245   Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
246   ForEachLoop *fatherTyped=(ForEachLoop *)_father;
247   fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
248   valueToDispatch->decrRef();
249 }
250
251 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
252                                                                                      _loop(loop),
253                                                                                      _normalFinish(normalFinish)
254 {
255   _state=YACS::TOACTIVATE;
256   _father=_loop->getFather();
257 }
258
259 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
260                                                                                     _normalFinish(false)
261 {
262 }
263
264 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
265 {
266   return new FakeNodeForForEachLoop(*this);
267 }
268
269 void FakeNodeForForEachLoop::exForwardFailed()
270 {
271   _loop->exForwardFailed();
272 }
273
274 void FakeNodeForForEachLoop::exForwardFinished()
275
276   _loop->exForwardFinished();
277 }
278
279 void FakeNodeForForEachLoop::execute()
280 {
281   if(!_normalFinish)
282     throw Exception("");//only to trigger ABORT on Executor
283   else
284     _loop->pushAllSequenceValues();
285 }
286
287 void FakeNodeForForEachLoop::aborted()
288 {
289   _loop->setState(YACS::ERROR);
290 }
291
292 void FakeNodeForForEachLoop::finished()
293 {
294   _loop->setState(YACS::DONE);
295 }
296
297 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
298                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
299                                                                                 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
300 {
301 }
302
303 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
304                                                                                            _splitterNode(other._splitterNode,this),
305                                                                                            _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
306 {
307   int i=0;
308   if(!editionOnly)
309     for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
310       {
311         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
312         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
313         temp->addRepr(getOutPort((*iter2)->getName()),interc);
314         interc->setRepr(temp);
315         _outGoingPorts.push_back(temp);
316         _intecptrsForOutGoingPorts.push_back(interc);
317       }
318 }
319
320 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
321 {
322   return new ForEachLoop(*this,father,editionOnly);
323 }
324
325 ForEachLoop::~ForEachLoop()
326 {
327   cleanDynGraph();
328   for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
329     delete *iter;
330   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
331     delete *iter2;
332 }
333
334 void ForEachLoop::init(bool start)
335 {
336   DynParaLoop::init(start);
337   _splitterNode.init(start);
338   _execCurrentId=0;
339   cleanDynGraph();
340   _currentIndex = 0;
341   exUpdateProgress();
342 }
343
344 void ForEachLoop::exUpdateState()
345 {
346   DEBTRACE("ForEachLoop::exUpdateState");
347   if(_state == YACS::DISABLED)
348     return;
349   if(_inGate.exIsReady())
350     {
351       //internal graph update
352       int i;
353       int nbOfBr=_nbOfBranches.getIntValue();
354       int nbOfElts=_splitterNode.getNumberOfElements();
355
356       DEBTRACE("nbOfElts=" << nbOfElts);
357       DEBTRACE("nbOfBr=" << nbOfBr);
358
359       if(nbOfElts==0)
360         {
361           prepareSequenceValues(0);
362           delete _nodeForSpecialCases;
363           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
364           setState(YACS::ACTIVATED);
365           return ;
366         }
367       if(nbOfBr<=0)
368         {
369           delete _nodeForSpecialCases;
370           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
371           setState(YACS::ACTIVATED);
372           return ;
373         }
374       if(nbOfBr>nbOfElts)
375         nbOfBr=nbOfElts;
376       _execNodes.resize(nbOfBr);
377       _execIds.resize(nbOfBr);
378       _execOutGoingPorts.resize(nbOfBr);
379       prepareSequenceValues(nbOfElts);
380       if(_initNode)
381         _execInitNodes.resize(nbOfBr);
382       _initializingCounter = 0;
383       if (_finalizeNode)
384         _execFinalizeNodes.resize(nbOfBr);
385
386       vector<Node *> origNodes;
387       origNodes.push_back(_initNode);
388       origNodes.push_back(_node);
389       origNodes.push_back(_finalizeNode);
390
391       //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors 
392       //so catch them to control errors
393       try
394         {
395           for(i=0;i<nbOfBr;i++)
396             {
397               DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
398               _execIds[i]=_execCurrentId;
399               DEBTRACE( "-------------- 2" );
400               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
401               if(_initNode)
402                 _execInitNodes[i] = clonedNodes[0];
403               _execNodes[i] = clonedNodes[1];
404               if(_finalizeNode)
405                 _execFinalizeNodes[i] = clonedNodes[2];
406               DEBTRACE( "-------------- 4" );
407               prepareInputsFromOutOfScope(i);
408               DEBTRACE( "-------------- 5" );
409               createOutputOutOfScopeInterceptors(i);
410               DEBTRACE( "-------------- 6" );
411               _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
412               DEBTRACE( "-------------- 7" );
413             } 
414         }
415       catch(YACS::Exception& ex)
416         {
417           //ForEachLoop must be put in error and the exception rethrown to notify the caller
418           DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
419           setState(YACS::ERROR);
420           exForwardFailed();
421           throw;
422         }
423
424       setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
425
426       //let's go
427       for(i=0;i<nbOfBr;i++)
428         if(_initNode)
429           {
430             _execInitNodes[i]->exUpdateState();
431             _initializingCounter++;
432           }
433         else
434           {
435             _nbOfEltConsumed++;
436             _execNodes[i]->exUpdateState();
437           }
438
439       forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
440     }
441 }
442
443 void ForEachLoop::exUpdateProgress()
444 {
445   // emit notification to all observers registered with the dispatcher on any change of the node's state
446   sendEvent("progress");
447 }
448
449 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
450 {
451   if(!_node)
452     return;
453   if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
454   if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
455     {
456       if(_nodeForSpecialCases)
457         {
458           _nodeForSpecialCases->getReadyTasks(tasks);
459           return ;
460         }
461       vector<Node *>::iterator iter;
462       for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
463         (*iter)->getReadyTasks(tasks);
464       for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
465         (*iter)->getReadyTasks(tasks);
466       for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
467         (*iter)->getReadyTasks(tasks);
468     }
469 }
470
471 int ForEachLoop::getNumberOfInputPorts() const
472 {
473   return DynParaLoop::getNumberOfInputPorts()+1;
474 }
475
476 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
477 {
478   //TO DO
479 }
480
481 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
482 {
483 }
484
485 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
486 {
487   list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
488   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
489   return ret;
490 }
491
492 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
493 {
494   list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
495   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
496   return ret;
497 }
498
499 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
500 {
501   if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
502     return (InputPort *)&_splitterNode._dataPortToDispatch;
503   else
504     return DynParaLoop::getInputPort(name);
505 }
506
507 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
508 {
509   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
510     {
511       if(name==(*iter)->getName())
512         return (OutputPort *)(*iter);
513     }
514   return DynParaLoop::getOutputPort(name);
515 }
516
517 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
518 {
519   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
520     {
521       if(name==(*iter)->getName())
522         return (OutPort *)(*iter);
523     }
524   return DynParaLoop::getOutPort(name);
525 }
526
527 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
528 {
529   if(name==NAME_OF_SPLITTERNODE)
530     return (Node *)&_splitterNode;
531   else
532     return DynParaLoop::getChildByShortName(name);
533 }
534
535 //! Method used to notify the node that a child node has finished
536 /*!
537  * Update the current state and return the change state
538  *
539  *  \param node : the child node that has finished
540  *  \return the state change
541  */
542 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
543 {
544   DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
545   unsigned int id;
546   switch(getIdentityOfNotifyerNode(node,id))
547     {
548     case INIT_NODE:
549       _execNodes[id]->exUpdateState();
550       _nbOfEltConsumed++;
551       _initializingCounter--;
552       if (_initializingCounter == 0) _initNode->setState(DONE);
553       break;
554     case WORK_NODE:
555       _currentIndex++;
556       exUpdateProgress();
557       storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
558       if(_execCurrentId==_splitterNode.getNumberOfElements())
559         {//No more elements of _dataPortToDispatch to treat
560           _execIds[id]=NOT_RUNNING_BRANCH_ID;
561           //analyzing if some samples are still on treatment on other branches.
562           bool isFinished=true;
563           for(int i=0;i<_execIds.size() && isFinished;i++)
564             isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
565           if(isFinished)
566             {
567               try 
568                 {
569                   pushAllSequenceValues();
570   
571                   if (_node)
572                     {
573                       _node->setState(YACS::DONE);
574      
575                       ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
576                       if (compNode)
577                         {
578                           std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
579                           std::list<Node *>::iterator iter=aChldn.begin();
580                           for(;iter!=aChldn.end();iter++)
581                             (*iter)->setState(YACS::DONE);
582                         }
583                     }
584
585                   if (_finalizeNode == NULL)
586                     {
587                       // No finalize node, we just finish the loop at the end of exec nodes execution
588                       setState(YACS::DONE);
589                       return YACS::FINISH;
590                     }
591                   else
592                     {
593                       // Run the finalize nodes, the loop will be done only when they all finish
594                       _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
595                       for (int i=0 ; i<_execIds.size() ; i++)
596                         {
597                           YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
598                           DEBTRACE("Launching finalize node for branch " << i);
599                           _execFinalizeNodes[i]->exUpdateState();
600                           _unfinishedCounter++;
601                         }
602                       return YACS::NOEVENT;
603                     }
604                 }
605               catch(YACS::Exception& ex)
606                 {
607                   DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
608                   //no way to push results : put following nodes in FAILED state
609                   //TODO could be more fine grain : put only concerned nodes in FAILED state
610                   exForwardFailed();
611                   setState(YACS::ERROR);
612                   return YACS::ABORT;
613                 }
614             }
615         }
616       else if(_state == YACS::ACTIVATED)
617         {//more elements to do and loop still activated
618           _execIds[id]=_execCurrentId;
619           node->init(false);
620           _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
621           node->exUpdateState();
622           forwardExecStateToOriginalBody(node);
623           _nbOfEltConsumed++;
624         }
625       else
626         {//elements to process and loop no more activated
627           DEBTRACE("foreach loop state " << _state);
628         }
629       break;
630     case FINALIZE_NODE:
631     {
632       DEBTRACE("Finalize node finished on branch " << id);
633       _unfinishedCounter--;
634       _currentIndex++;
635       exUpdateProgress();
636       DEBTRACE(_unfinishedCounter << " finalize nodes still running");
637       if (_unfinishedCounter == 0)
638         {
639           _finalizeNode->setState(YACS::DONE);
640           setState(YACS::DONE);
641           return YACS::FINISH;
642         }
643       else
644         return YACS::NOEVENT;
645       break;
646     }
647     default:
648       YASSERT(false);
649     }
650   return YACS::NOEVENT;
651 }
652
653 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
654 {
655   DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
656   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
657   if(typeOfPortInstance==OutputPort::NAME)
658     {
659       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
660       int i=0;
661       for(;iter!=_outGoingPorts.end();iter++,i++)
662         if((*iter)->getRepr()==port.first || *iter==port.first)
663           break;
664       if(iter!=_outGoingPorts.end())
665         {
666           if(*iter!=port.first)
667             {
668               (*iter)->incrRef();
669               (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
670             }
671           port.first=*iter;
672         }
673       else
674         {
675           TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
676           AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
677           InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
678           intercptor->setRepr(newPort);
679           newTc->decrRef();
680           newPort->addRepr(port.first,intercptor);
681           _outGoingPorts.push_back(newPort);
682           _intecptrsForOutGoingPorts.push_back(intercptor);
683           port.first=newPort;
684         }
685     }
686   else
687     throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
688 }
689
690 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
691 {
692   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
693   if(typeOfPortInstance==OutputPort::NAME)
694     {
695       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
696       for(;iter!=_outGoingPorts.end();iter++)
697         if((*iter)->getRepr()==port.first)
698           break;
699       if(iter==_outGoingPorts.end())
700         {
701           string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name; 
702           throw Exception(what);
703         }
704       else
705         port.first=(*iter);
706     }
707   else
708     throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
709 }
710
711 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
712 {
713   string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
714   if(typeOfPortInstance==OutputPort::NAME)
715     {
716       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
717       vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
718       for(;iter!=_outGoingPorts.end();iter++,iter2++)
719         if((*iter)->getRepr()==portDwn)
720           break;
721       //ASSERT(portUp==*iter.second)
722       if((*iter)->decrRef())
723         {
724           AnySplitOutputPort *p=*iter;
725           _outGoingPorts.erase(iter);
726           delete p;
727           InterceptorInputPort *ip=*iter2;
728           _intecptrsForOutGoingPorts.erase(iter2);
729           delete ip;
730         }
731     }
732 }
733
734 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
735 {
736   string portName, nodeName;
737   splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
738   Node *staticChild = getChildByName(nodeName);
739   return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
740   //that a link starting from _initNode goes out of scope of 'this'.
741 }
742
743 void ForEachLoop::cleanDynGraph()
744 {
745   DynParaLoop::cleanDynGraph();
746   for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
747     (*iter3)->decrRef();
748   _execVals.clear();
749   for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
750     for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
751       delete *iter5;
752   _execOutGoingPorts.clear();
753 }
754
755 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
756 {
757   vector<AnyInputPort *>::iterator iter;
758   int i=0;
759   for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
760     {
761       Any *val=(Any *)(*iter)->getValue();
762       _execVals[i]->setEltAtRank(rank,val);
763     }
764 }
765
766 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
767 {
768   _execVals.resize(_outGoingPorts.size());
769   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
770   for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
771     _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
772 }
773
774 void ForEachLoop::pushAllSequenceValues()
775 {
776   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
777   int i=0;
778   for(;iter!=_outGoingPorts.end();iter++,i++)
779     (*iter)->put((const void *)_execVals[i]);
780 }
781
782 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
783 {
784   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
785   int i=0;
786   for(;iter!=_outGoingPorts.end();iter++,i++)
787     {
788       DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
789       //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
790       OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
791       DEBTRACE( portOut->getName() );
792       AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
793       portOut->addInPort(interceptor);
794       _execOutGoingPorts[branchNb].push_back(interceptor);
795     }
796 }
797
798 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
799                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
800 {
801   if(isInMyDescendance(start->getNode())==_node)
802     throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
803 }
804
805 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
806 {
807   list<OutputPort *> ret;
808   ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); 
809   return ret;
810 }
811
812 void ForEachLoop::accept(Visitor *visitor)
813 {
814   visitor->visitForEachLoop(this);
815 }
816
817 //! Dump the node state to a stream
818 /*!
819  * \param os : the output stream
820  */
821 void ForEachLoop::writeDot(std::ostream &os) const
822 {
823   os << "  subgraph cluster_" << getId() << "  {\n" ;
824   //only one node in a loop
825   if(_node)
826     {
827       _node->writeDot(os);
828       os << getId() << " -> " << _node->getId() << ";\n";
829     }
830   os << "}\n" ;
831   os << getId() << "[fillcolor=\"" ;
832   YACS::StatesForNode state=getEffectiveState();
833   os << getColorState(state);
834   os << "\" label=\"" << "Loop:" ;
835   os << getName() <<"\"];\n";
836 }
837
838 //! Reset the state of the node and its children depending on the parameter level
839 void ForEachLoop::resetState(int level)
840 {
841   if(level==0)return;
842   DynParaLoop::resetState(level);
843   _execCurrentId=0;
844   //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) 
845   cleanDynGraph();
846 }
847
848 std::string ForEachLoop::getProgress() const
849 {
850   int nbElems = _splitterNode.getNumberOfElements();
851   std::stringstream aProgress;
852   if (nbElems > 0)
853     aProgress << _currentIndex << "/" << nbElems;
854   else
855     aProgress << "0";
856   return aProgress.str();
857 }
858
859 /*!
860  * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
861  * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
862  * This method has one input \a execut and 3 outputs.
863  *
864  * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
865  * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
866  *                        All of the items in \a outputs have the same size.
867  * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
868  * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
869  *         SequenceAny objects contained in \a outputs.
870  *
871  * \sa edGetSeqOfSamplesPort
872  */
873 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
874 {
875   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
876   if(_execVals.empty())
877     return std::vector<unsigned int>();
878   if(_execOutGoingPorts.empty())
879     return std::vector<unsigned int>();
880   std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
881   const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
882   for(std::size_t i=0;i<sz;i++)
883     {
884       outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
885       nameOfOutputs[i]=ports[i]->getName();
886     }
887   return _execVals[0]->getSetItems();
888 }
889