]> SALOME platform Git repositories - modules/yacs.git/blob - src/engine/ForEachLoop.cxx
Salome HOME
Implementation of keep going mode of executor for ForEachLoops.
[modules/yacs.git] / src / engine / ForEachLoop.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26
27 #include <iostream>
28 #include <sstream>
29
30 //#define _DEVDEBUG_
31 #include "YacsTrace.hxx"
32
33 using namespace YACS::ENGINE;
34 using namespace std;
35
36 /*! \class YACS::ENGINE::ForEachLoop
37  *  \brief Loop node for parametric calculation
38  *
39  *  \ingroup Nodes
40  */
41
42 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
43
44 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
45
46 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
47
48 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
49
50 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
51                                                                                                 DataPort(name,node,type),Port(node),
52                                                                                                 _repr(0)
53 {
54 }
55
56 InterceptorInputPort::InterceptorInputPort(const  InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
57                                                                                                 Port(other,newHelder),
58                                                                                                 _repr(0)
59 {
60 }
61
62 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
63 {
64   set<InPort *> ports=_repr->edSetInPort();
65   for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
66     (*iter)->getAllRepresentants(repr);
67 }
68
69 InputPort *InterceptorInputPort::clone(Node *newHelder) const
70 {
71   return new InterceptorInputPort(*this,newHelder);
72 }
73
74 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
75 {
76   _repr=repr;
77 }
78
79 bool AnySplitOutputPort::decrRef()
80 {
81   return (--_cnt==0);
82 }
83
84 void AnySplitOutputPort::incrRef() const
85 {
86   _cnt++;
87 }
88
89 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
90                                                                                             DataPort(name,node,type),Port(node),
91                                                                                             _repr(0),_intercptr(0),_cnt(1)
92 {
93 }
94
95 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
96                                                                                          DataPort(other,newHelder),
97                                                                                          Port(other,newHelder),
98                                                                                          _repr(0),_intercptr(0),_cnt(1)
99 {
100 }
101
102 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
103 {
104   bool ret=OutputPort::addInPort(inPort);
105   if(_repr)
106     _repr->addInPort(_intercptr);
107   return ret;
108 }
109
110 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
111 {
112   if(!_repr)
113     OutPort::getAllRepresented(represented);
114   else
115     _repr->getAllRepresented(represented);
116 }
117
118 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
119 {
120   bool ret=OutputPort::removeInPort(inPort,forward);
121   if(_repr)
122     if(_setOfInputPort.empty())
123       _repr->removeInPort(_intercptr,forward);
124   return ret;
125 }
126
127 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
128 {
129   _repr=repr;
130   _intercptr=intercptr;
131 }
132
133 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
134 {
135   return new AnySplitOutputPort(*this,newHelder);
136 }
137
138 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
139 {
140   _type->decrRef();
141 }
142
143 SeqAnyInputPort::SeqAnyInputPort(const  SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
144 {
145 }
146
147 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
148 {
149   return new SeqAnyInputPort(*this,newHelder);
150 }
151
152 unsigned SeqAnyInputPort::getNumberOfElements() const
153 {
154   const SequenceAny * valCsted=(const SequenceAny *) _value;
155   if (valCsted) return valCsted->size();
156   return 0;
157 }
158
159 Any *SeqAnyInputPort::getValueAtRank(int i) const
160 {
161   const SequenceAny * valCsted=(const SequenceAny *) _value;
162   AnyPtr ret=(*valCsted)[i];
163   ret->incrRef();
164   return ret;
165 }
166
167 std::string SeqAnyInputPort::dump()
168 {
169   stringstream xmldump;
170   int nbElem = getNumberOfElements();
171   xmldump << "<value><array><data>" << endl;
172   for (int i = 0; i < nbElem; i++)
173     {
174       Any *val = getValueAtRank(i);
175       switch (val->getType()->kind())
176         {
177         case Double:
178           xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
179           break;
180         case Int:
181           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
182           break;
183         case Bool:
184           xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
185           break;
186         case String:
187           xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
188           break;
189         case Objref:
190           xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
191           break;
192         default:
193           xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
194           break;
195         }
196     }
197   xmldump << "</data></array></value>" << endl;
198   return xmldump.str();
199 }
200
201 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData, 
202                            ForEachLoop *father):ElementaryNode(name),
203                                                 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
204                                                                     this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
205 {
206   _father=father;
207 }
208
209 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
210                                                                            _dataPortToDispatch(other._dataPortToDispatch,this)
211 {
212 }
213
214 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
215 {
216   if(name==NAME_OF_SEQUENCE_INPUT)
217     return (InputPort *)&_dataPortToDispatch;
218   else
219     return ElementaryNode::getInputPort(name);
220 }
221
222 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
223 {
224   return new SplitterNode(*this,(ForEachLoop *)father);
225 }
226
227 unsigned SplitterNode::getNumberOfElements() const
228 {
229   return _dataPortToDispatch.getNumberOfElements();
230 }
231
232 void SplitterNode::execute()
233 {
234   //Nothing : should never been called elsewhere big problem...
235 }
236
237 void SplitterNode::init(bool start)
238 {
239   ElementaryNode::init(start);
240   _dataPortToDispatch.exInit(start);
241 }
242
243 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
244 {
245   Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
246   ForEachLoop *fatherTyped=(ForEachLoop *)_father;
247   fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
248   valueToDispatch->decrRef();
249 }
250
251 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
252                                                                                      _loop(loop),
253                                                                                      _normalFinish(normalFinish)
254 {
255   _state=YACS::TOACTIVATE;
256   _father=_loop->getFather();
257 }
258
259 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
260                                                                                     _normalFinish(false)
261 {
262 }
263
264 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
265 {
266   return new FakeNodeForForEachLoop(*this);
267 }
268
269 void FakeNodeForForEachLoop::exForwardFailed()
270 {
271   _loop->exForwardFailed();
272 }
273
274 void FakeNodeForForEachLoop::exForwardFinished()
275
276   _loop->exForwardFinished();
277 }
278
279 void FakeNodeForForEachLoop::execute()
280 {
281   if(!_normalFinish)
282     throw Exception("");//only to trigger ABORT on Executor
283   else
284     _loop->pushAllSequenceValues();
285 }
286
287 void FakeNodeForForEachLoop::aborted()
288 {
289   _loop->setState(YACS::ERROR);
290 }
291
292 void FakeNodeForForEachLoop::finished()
293 {
294   _loop->setState(YACS::DONE);
295 }
296
297 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
298                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
299                                                                                 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
300 {
301 }
302
303 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
304                                                                                            _splitterNode(other._splitterNode,this),
305                                                                                            _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
306 {
307   int i=0;
308   if(!editionOnly)
309     for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
310       {
311         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
312         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
313         temp->addRepr(getOutPort((*iter2)->getName()),interc);
314         interc->setRepr(temp);
315         _outGoingPorts.push_back(temp);
316         _intecptrsForOutGoingPorts.push_back(interc);
317       }
318 }
319
320 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
321 {
322   return new ForEachLoop(*this,father,editionOnly);
323 }
324
325 ForEachLoop::~ForEachLoop()
326 {
327   cleanDynGraph();
328   for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
329     delete *iter;
330   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
331     delete *iter2;
332 }
333
334 void ForEachLoop::init(bool start)
335 {
336   DynParaLoop::init(start);
337   _splitterNode.init(start);
338   _execCurrentId=0;
339   cleanDynGraph();
340   _currentIndex = 0;
341   exUpdateProgress();
342 }
343
344 void ForEachLoop::exUpdateState()
345 {
346   DEBTRACE("ForEachLoop::exUpdateState");
347   if(_state == YACS::DISABLED)
348     return;
349   if(_inGate.exIsReady())
350     {
351       //internal graph update
352       int i;
353       int nbOfBr=_nbOfBranches.getIntValue();
354       int nbOfElts=_splitterNode.getNumberOfElements();
355
356       DEBTRACE("nbOfElts=" << nbOfElts);
357       DEBTRACE("nbOfBr=" << nbOfBr);
358
359       if(nbOfElts==0)
360         {
361           prepareSequenceValues(0);
362           delete _nodeForSpecialCases;
363           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
364           setState(YACS::ACTIVATED);
365           return ;
366         }
367       if(nbOfBr<=0)
368         {
369           delete _nodeForSpecialCases;
370           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
371           setState(YACS::ACTIVATED);
372           return ;
373         }
374       if(nbOfBr>nbOfElts)
375         nbOfBr=nbOfElts;
376       _execNodes.resize(nbOfBr);
377       _execIds.resize(nbOfBr);
378       _execOutGoingPorts.resize(nbOfBr);
379       prepareSequenceValues(nbOfElts);
380       if(_initNode)
381         _execInitNodes.resize(nbOfBr);
382       _initializingCounter = 0;
383       if (_finalizeNode)
384         _execFinalizeNodes.resize(nbOfBr);
385
386       vector<Node *> origNodes;
387       origNodes.push_back(_initNode);
388       origNodes.push_back(_node);
389       origNodes.push_back(_finalizeNode);
390
391       //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors 
392       //so catch them to control errors
393       try
394         {
395           for(i=0;i<nbOfBr;i++)
396             {
397               DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
398               _execIds[i]=_execCurrentId;
399               DEBTRACE( "-------------- 2" );
400               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
401               if(_initNode)
402                 _execInitNodes[i] = clonedNodes[0];
403               _execNodes[i] = clonedNodes[1];
404               if(_finalizeNode)
405                 _execFinalizeNodes[i] = clonedNodes[2];
406               DEBTRACE( "-------------- 4" );
407               prepareInputsFromOutOfScope(i);
408               DEBTRACE( "-------------- 5" );
409               createOutputOutOfScopeInterceptors(i);
410               DEBTRACE( "-------------- 6" );
411               _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
412               DEBTRACE( "-------------- 7" );
413             } 
414         }
415       catch(YACS::Exception& ex)
416         {
417           //ForEachLoop must be put in error and the exception rethrown to notify the caller
418           DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
419           setState(YACS::ERROR);
420           exForwardFailed();
421           throw;
422         }
423
424       setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
425
426       //let's go
427       for(i=0;i<nbOfBr;i++)
428         if(_initNode)
429           {
430             _execInitNodes[i]->exUpdateState();
431             _initializingCounter++;
432           }
433         else
434           {
435             _nbOfEltConsumed++;
436             _execNodes[i]->exUpdateState();
437           }
438
439       forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
440     }
441 }
442
443 void ForEachLoop::exUpdateProgress()
444 {
445   // emit notification to all observers registered with the dispatcher on any change of the node's state
446   sendEvent("progress");
447 }
448
449 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
450 {
451   if(!_node)
452     return;
453   if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
454   if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
455     {
456       if(_nodeForSpecialCases)
457         {
458           _nodeForSpecialCases->getReadyTasks(tasks);
459           return ;
460         }
461       vector<Node *>::iterator iter;
462       for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
463         (*iter)->getReadyTasks(tasks);
464       for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
465         (*iter)->getReadyTasks(tasks);
466       for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
467         (*iter)->getReadyTasks(tasks);
468     }
469 }
470
471 int ForEachLoop::getNumberOfInputPorts() const
472 {
473   return DynParaLoop::getNumberOfInputPorts()+1;
474 }
475
476 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
477 {
478   //TO DO
479 }
480
481 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
482 {
483 }
484
485 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
486 {
487   list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
488   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
489   return ret;
490 }
491
492 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
493 {
494   list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
495   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
496   return ret;
497 }
498
499 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
500 {
501   if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
502     return (InputPort *)&_splitterNode._dataPortToDispatch;
503   else
504     return DynParaLoop::getInputPort(name);
505 }
506
507 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
508 {
509   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
510     {
511       if(name==(*iter)->getName())
512         return (OutputPort *)(*iter);
513     }
514   return DynParaLoop::getOutputPort(name);
515 }
516
517 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
518 {
519   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
520     {
521       if(name==(*iter)->getName())
522         return (OutPort *)(*iter);
523     }
524   return DynParaLoop::getOutPort(name);
525 }
526
527 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
528 {
529   if(name==NAME_OF_SPLITTERNODE)
530     return (Node *)&_splitterNode;
531   else
532     return DynParaLoop::getChildByShortName(name);
533 }
534
535 //! Method used to notify the node that a child node has finished
536 /*!
537  * Update the current state and return the change state
538  *
539  *  \param node : the child node that has finished
540  *  \return the state change
541  */
542 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
543 {
544   DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
545   unsigned int id;
546   switch(getIdentityOfNotifyerNode(node,id))
547     {
548     case INIT_NODE:
549       return updateStateForInitNodeOnFinishedEventFrom(node,id);
550     case WORK_NODE:
551       return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
552     case FINALIZE_NODE:
553       return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
554     default:
555       YASSERT(false);
556     }
557   return YACS::NOEVENT;
558 }
559
560 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
561 {
562   _execNodes[id]->exUpdateState();
563   _nbOfEltConsumed++;
564   _initializingCounter--;
565   if (_initializingCounter == 0)
566     _initNode->setState(DONE);
567   return YACS::NOEVENT;
568 }
569
570 /*!
571  * \param [in] isNormalFinish - if true
572  */
573 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
574 {
575   _currentIndex++;
576   exUpdateProgress();
577   if(isNormalFinish)
578     storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
579   if(_execCurrentId==_splitterNode.getNumberOfElements())
580     {//No more elements of _dataPortToDispatch to treat
581       _execIds[id]=NOT_RUNNING_BRANCH_ID;
582       //analyzing if some samples are still on treatment on other branches.
583       bool isFinished(true);
584       for(int i=0;i<_execIds.size() && isFinished;i++)
585         isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
586       if(isFinished)
587         {
588           try
589           {
590               if(_failedCounter!=0)
591                 {
592                   std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
593                   throw YACS::Exception(oss.str());
594                 }
595               pushAllSequenceValues();
596
597               if (_node)
598                 {
599                   _node->setState(YACS::DONE);
600
601                   ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
602                   if (compNode)
603                     {
604                       std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
605                       std::list<Node *>::iterator iter=aChldn.begin();
606                       for(;iter!=aChldn.end();iter++)
607                         (*iter)->setState(YACS::DONE);
608                     }
609                 }
610
611               if (_finalizeNode == NULL)
612                 {
613                   // No finalize node, we just finish the loop at the end of exec nodes execution
614                   setState(YACS::DONE);
615                   return YACS::FINISH;
616                 }
617               else
618                 {
619                   // Run the finalize nodes, the loop will be done only when they all finish
620                   _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
621                   for (int i=0 ; i<_execIds.size() ; i++)
622                     {
623                       YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
624                       DEBTRACE("Launching finalize node for branch " << i);
625                       _execFinalizeNodes[i]->exUpdateState();
626                       _unfinishedCounter++;
627                     }
628                   return YACS::NOEVENT;
629                 }
630           }
631           catch(YACS::Exception& ex)
632           {
633               DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
634               //no way to push results : put following nodes in FAILED state
635               //TODO could be more fine grain : put only concerned nodes in FAILED state
636               exForwardFailed();
637               setState(YACS::ERROR);
638               return YACS::ABORT;
639           }
640         }
641     }
642   else if(_state == YACS::ACTIVATED)
643     {//more elements to do and loop still activated
644       _execIds[id]=_execCurrentId;
645       node->init(false);
646       _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
647       node->exUpdateState();
648       forwardExecStateToOriginalBody(node);
649       _nbOfEltConsumed++;
650     }
651   else
652     {//elements to process and loop no more activated
653       DEBTRACE("foreach loop state " << _state);
654     }
655   return YACS::NOEVENT;
656 }
657
658 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
659 {
660   DEBTRACE("Finalize node finished on branch " << id);
661   _unfinishedCounter--;
662   _currentIndex++;
663   exUpdateProgress();
664   DEBTRACE(_unfinishedCounter << " finalize nodes still running");
665   if (_unfinishedCounter == 0)
666     {
667       _finalizeNode->setState(YACS::DONE);
668       setState(YACS::DONE);
669       return YACS::FINISH;
670     }
671   else
672     return YACS::NOEVENT;
673 }
674
675 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
676 {
677   unsigned int id;
678   DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
679   if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
680     return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
681   else
682     {
683       _failedCounter++;
684       return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
685     }
686 }
687
688 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
689 {
690   DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
691   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
692   if(typeOfPortInstance==OutputPort::NAME)
693     {
694       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
695       int i=0;
696       for(;iter!=_outGoingPorts.end();iter++,i++)
697         if((*iter)->getRepr()==port.first || *iter==port.first)
698           break;
699       if(iter!=_outGoingPorts.end())
700         {
701           if(*iter!=port.first)
702             {
703               (*iter)->incrRef();
704               (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
705             }
706           port.first=*iter;
707         }
708       else
709         {
710           TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
711           AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
712           InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
713           intercptor->setRepr(newPort);
714           newTc->decrRef();
715           newPort->addRepr(port.first,intercptor);
716           _outGoingPorts.push_back(newPort);
717           _intecptrsForOutGoingPorts.push_back(intercptor);
718           port.first=newPort;
719         }
720     }
721   else
722     throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
723 }
724
725 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
726 {
727   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
728   if(typeOfPortInstance==OutputPort::NAME)
729     {
730       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
731       for(;iter!=_outGoingPorts.end();iter++)
732         if((*iter)->getRepr()==port.first)
733           break;
734       if(iter==_outGoingPorts.end())
735         {
736           string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name; 
737           throw Exception(what);
738         }
739       else
740         port.first=(*iter);
741     }
742   else
743     throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
744 }
745
746 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
747 {
748   string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
749   if(typeOfPortInstance==OutputPort::NAME)
750     {
751       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
752       vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
753       for(;iter!=_outGoingPorts.end();iter++,iter2++)
754         if((*iter)->getRepr()==portDwn)
755           break;
756       //ASSERT(portUp==*iter.second)
757       if((*iter)->decrRef())
758         {
759           AnySplitOutputPort *p=*iter;
760           _outGoingPorts.erase(iter);
761           delete p;
762           InterceptorInputPort *ip=*iter2;
763           _intecptrsForOutGoingPorts.erase(iter2);
764           delete ip;
765         }
766     }
767 }
768
769 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
770 {
771   string portName, nodeName;
772   splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
773   Node *staticChild = getChildByName(nodeName);
774   return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
775   //that a link starting from _initNode goes out of scope of 'this'.
776 }
777
778 void ForEachLoop::cleanDynGraph()
779 {
780   DynParaLoop::cleanDynGraph();
781   for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
782     (*iter3)->decrRef();
783   _execVals.clear();
784   for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
785     for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
786       delete *iter5;
787   _execOutGoingPorts.clear();
788 }
789
790 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
791 {
792   vector<AnyInputPort *>::iterator iter;
793   int i=0;
794   for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
795     {
796       Any *val=(Any *)(*iter)->getValue();
797       _execVals[i]->setEltAtRank(rank,val);
798     }
799 }
800
801 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
802 {
803   _execVals.resize(_outGoingPorts.size());
804   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
805   for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
806     _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
807 }
808
809 void ForEachLoop::pushAllSequenceValues()
810 {
811   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
812   int i=0;
813   for(;iter!=_outGoingPorts.end();iter++,i++)
814     (*iter)->put((const void *)_execVals[i]);
815 }
816
817 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
818 {
819   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
820   int i=0;
821   for(;iter!=_outGoingPorts.end();iter++,i++)
822     {
823       DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
824       //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
825       OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
826       DEBTRACE( portOut->getName() );
827       AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
828       portOut->addInPort(interceptor);
829       _execOutGoingPorts[branchNb].push_back(interceptor);
830     }
831 }
832
833 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
834                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
835 {
836   if(isInMyDescendance(start->getNode())==_node)
837     throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
838 }
839
840 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
841 {
842   list<OutputPort *> ret;
843   ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); 
844   return ret;
845 }
846
847 void ForEachLoop::accept(Visitor *visitor)
848 {
849   visitor->visitForEachLoop(this);
850 }
851
852 //! Dump the node state to a stream
853 /*!
854  * \param os : the output stream
855  */
856 void ForEachLoop::writeDot(std::ostream &os) const
857 {
858   os << "  subgraph cluster_" << getId() << "  {\n" ;
859   //only one node in a loop
860   if(_node)
861     {
862       _node->writeDot(os);
863       os << getId() << " -> " << _node->getId() << ";\n";
864     }
865   os << "}\n" ;
866   os << getId() << "[fillcolor=\"" ;
867   YACS::StatesForNode state=getEffectiveState();
868   os << getColorState(state);
869   os << "\" label=\"" << "Loop:" ;
870   os << getName() <<"\"];\n";
871 }
872
873 //! Reset the state of the node and its children depending on the parameter level
874 void ForEachLoop::resetState(int level)
875 {
876   if(level==0)return;
877   DynParaLoop::resetState(level);
878   _execCurrentId=0;
879   //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) 
880   cleanDynGraph();
881 }
882
883 std::string ForEachLoop::getProgress() const
884 {
885   int nbElems = _splitterNode.getNumberOfElements();
886   std::stringstream aProgress;
887   if (nbElems > 0)
888     aProgress << _currentIndex << "/" << nbElems;
889   else
890     aProgress << "0";
891   return aProgress.str();
892 }
893
894 /*!
895  * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
896  * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
897  * This method has one input \a execut and 3 outputs.
898  *
899  * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
900  * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
901  *                        All of the items in \a outputs have the same size.
902  * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
903  * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
904  *         SequenceAny objects contained in \a outputs.
905  *
906  * \sa edGetSeqOfSamplesPort
907  */
908 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
909 {
910   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
911   if(_execVals.empty())
912     return std::vector<unsigned int>();
913   if(_execOutGoingPorts.empty())
914     return std::vector<unsigned int>();
915   std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
916   const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
917   for(std::size_t i=0;i<sz;i++)
918     {
919       outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
920       nameOfOutputs[i]=ports[i]->getName();
921     }
922   return _execVals[0]->getSetItems();
923 }
924