Salome HOME
ec235621c0efe4116efcefb8e4f18a6f3ff02d0f
[modules/yacs.git] / src / engine / ForEachLoop.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26
27 #include <iostream>
28 #include <iomanip>
29 #include <sstream>
30 #include <algorithm>    // std::replace_if
31
32 //#define _DEVDEBUG_
33 #include "YacsTrace.hxx"
34
35 using namespace YACS::ENGINE;
36 using namespace std;
37
38 /*! \class YACS::ENGINE::ForEachLoop
39  *  \brief Loop node for parametric calculation
40  *
41  *  \ingroup Nodes
42  */
43
44 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
45
46 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
47
48 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
49
50 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
51
52 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
53                                                                                                 DataPort(name,node,type),Port(node),
54                                                                                                 _repr(0)
55 {
56 }
57
58 InterceptorInputPort::InterceptorInputPort(const  InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
59                                                                                                 Port(other,newHelder),
60                                                                                                 _repr(0)
61 {
62 }
63
64 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
65 {
66   set<InPort *> ports=_repr->edSetInPort();
67   for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
68     (*iter)->getAllRepresentants(repr);
69 }
70
71 InputPort *InterceptorInputPort::clone(Node *newHelder) const
72 {
73   return new InterceptorInputPort(*this,newHelder);
74 }
75
76 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
77 {
78   _repr=repr;
79 }
80
81 bool AnySplitOutputPort::decrRef()
82 {
83   return (--_cnt==0);
84 }
85
86 void AnySplitOutputPort::incrRef() const
87 {
88   _cnt++;
89 }
90
91 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
92                                                                                             DataPort(name,node,type),Port(node),
93                                                                                             _repr(0),_intercptr(0),_cnt(1)
94 {
95 }
96
97 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
98                                                                                          DataPort(other,newHelder),
99                                                                                          Port(other,newHelder),
100                                                                                          _repr(0),_intercptr(0),_cnt(1)
101 {
102 }
103
104 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
105 {
106   bool ret=OutputPort::addInPort(inPort);
107   if(_repr)
108     _repr->addInPort(_intercptr);
109   return ret;
110 }
111
112 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
113 {
114   if(!_repr)
115     OutPort::getAllRepresented(represented);
116   else
117     _repr->getAllRepresented(represented);
118 }
119
120 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
121 {
122   bool ret=OutputPort::removeInPort(inPort,forward);
123   if(_repr)
124     if(_setOfInputPort.empty())
125       _repr->removeInPort(_intercptr,forward);
126   return ret;
127 }
128
129 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
130 {
131   _repr=repr;
132   _intercptr=intercptr;
133 }
134
135 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
136 {
137   return new AnySplitOutputPort(*this,newHelder);
138 }
139
140 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
141 {
142   _type->decrRef();
143 }
144
145 SeqAnyInputPort::SeqAnyInputPort(const  SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
146 {
147 }
148
149 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
150 {
151   return new SeqAnyInputPort(*this,newHelder);
152 }
153
154 unsigned SeqAnyInputPort::getNumberOfElements() const
155 {
156   const SequenceAny * valCsted=(const SequenceAny *) _value;
157   if (valCsted) return valCsted->size();
158   return 0;
159 }
160
161 Any *SeqAnyInputPort::getValueAtRank(int i) const
162 {
163   const SequenceAny * valCsted=(const SequenceAny *) _value;
164   AnyPtr ret=(*valCsted)[i];
165   ret->incrRef();
166   return ret;
167 }
168
169 std::string SeqAnyInputPort::dump()
170 {
171   stringstream xmldump;
172   int nbElem = getNumberOfElements();
173   xmldump << "<value><array><data>" << endl;
174   for (int i = 0; i < nbElem; i++)
175     {
176       Any *val = getValueAtRank(i);
177       switch (val->getType()->kind())
178         {
179         case Double:
180           xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
181           break;
182         case Int:
183           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
184           break;
185         case Bool:
186           xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
187           break;
188         case String:
189           xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
190           break;
191         case Objref:
192           xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
193           break;
194         default:
195           xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
196           break;
197         }
198     }
199   xmldump << "</data></array></value>" << endl;
200   return xmldump.str();
201 }
202
203 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData, 
204                            ForEachLoop *father):ElementaryNode(name),
205                                                 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
206                                                                     this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
207 {
208   _father=father;
209 }
210
211 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
212                                                                            _dataPortToDispatch(other._dataPortToDispatch,this)
213 {
214 }
215
216 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
217 {
218   if(name==NAME_OF_SEQUENCE_INPUT)
219     return (InputPort *)&_dataPortToDispatch;
220   else
221     return ElementaryNode::getInputPort(name);
222 }
223
224 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
225 {
226   return new SplitterNode(*this,(ForEachLoop *)father);
227 }
228
229 unsigned SplitterNode::getNumberOfElements() const
230 {
231   return _dataPortToDispatch.getNumberOfElements();
232 }
233
234 void SplitterNode::execute()
235 {
236   //Nothing : should never been called elsewhere big problem...
237 }
238
239 void SplitterNode::init(bool start)
240 {
241   ElementaryNode::init(start);
242   _dataPortToDispatch.exInit(start);
243 }
244
245 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
246 {
247   Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
248   ForEachLoop *fatherTyped=(ForEachLoop *)_father;
249   fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
250   valueToDispatch->decrRef();
251 }
252
253 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
254                                                                                      _loop(loop),
255                                                                                      _normalFinish(normalFinish)
256 {
257   _state=YACS::TOACTIVATE;
258   _father=_loop->getFather();
259 }
260
261 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
262                                                                                     _normalFinish(false)
263 {
264 }
265
266 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
267 {
268   return new FakeNodeForForEachLoop(*this);
269 }
270
271 void FakeNodeForForEachLoop::exForwardFailed()
272 {
273   _loop->exForwardFailed();
274 }
275
276 void FakeNodeForForEachLoop::exForwardFinished()
277
278   _loop->exForwardFinished();
279 }
280
281 void FakeNodeForForEachLoop::execute()
282 {
283   if(!_normalFinish)
284     throw Exception("");//only to trigger ABORT on Executor
285   else
286     _loop->pushAllSequenceValues();
287 }
288
289 void FakeNodeForForEachLoop::aborted()
290 {
291   _loop->setState(YACS::ERROR);
292 }
293
294 void FakeNodeForForEachLoop::finished()
295 {
296   _loop->setState(YACS::DONE);
297 }
298
299 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
300 {
301   std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
302   if(sz1!=sz2)
303     throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
304   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
305     {
306       const SequenceAny *elt(*it);
307       if(elt)
308         if(sz!=(std::size_t)elt->size())
309           throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
310     }
311   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
312     {
313       SequenceAny *elt(*it);
314       if(elt)
315         elt->incrRef();
316     }
317 }
318
319 ForEachLoopPassedData::~ForEachLoopPassedData()
320 {
321   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
322     {
323       SequenceAny *elt(*it);
324       if(elt)
325         elt->decrRef();
326     }
327 }
328
329 void ForEachLoopPassedData::init()
330 {
331   _flagsIds.clear();
332 }
333
334 void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
335 {
336   if(nbOfElts<0)
337     throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
338   std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
339   if(nbOfElts2<sizeExp)
340     throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
341   for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
342     {
343       if((*it)>=nbOfElts2)
344         throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
345     }
346   _flagsIds.resize(nbOfElts);
347   std::fill(_flagsIds.begin(),_flagsIds.end(),false);
348   for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
349     {
350       if(*it<nbOfElts)
351         {
352           if(!_flagsIds[*it])
353             _flagsIds[*it]=true;
354           else
355             {
356               std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
357               throw YACS::Exception(oss.str());
358             }
359         }
360       else
361         {
362           std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," <<  nbOfElts << ") !";
363           throw YACS::Exception(oss.str());
364         }
365     }
366 }
367
368 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
369 {
370   std::size_t sz(_nameOfOutputs.size());
371   if(sz!=ports.size())
372     throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
373   for(std::size_t i=0;i<sz;i++)
374     {
375       AnyInputPort *elt(ports[i]);
376       if(!elt)
377         throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
378       if(_nameOfOutputs[i]!=elt->getName())
379         {
380           std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
381           throw YACS::Exception(oss.str());
382         }
383     }
384 }
385
386 /*!
387  * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
388  */
389 int ForEachLoopPassedData::toAbsId(int localId) const
390 {
391   if(localId<0)
392     throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
393   int ret(0),curLocId(0);
394   for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
395     {
396       if(!*it)
397         {
398           if(localId==curLocId)
399             return ret;
400           curLocId++;
401         }
402     }
403   throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
404 }
405
406 /*!
407  * Equivalent to toAbsId except that only ON are considered here.
408  */
409 int ForEachLoopPassedData::toAbsIdNot(int localId) const
410 {
411   if(localId<0)
412     throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
413   int ret(0),curLocId(0);
414   for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
415     {
416       if(*it)//<- diff is here !
417         {
418           if(localId==curLocId)
419             return ret;
420           curLocId++;
421         }
422     }
423   throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
424 }
425
426 int ForEachLoopPassedData::getNumberOfElementsToDo() const
427 {
428   std::size_t nbAllElts(_flagsIds.size());
429   std::size_t ret(nbAllElts-_passedIds.size());
430   return ret;
431 }
432
433 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
434 {
435   std::size_t sz(execVals.size());
436   if(_passedOutputs.size()!=sz)
437     throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
438   for(std::size_t i=0;i<sz;i++)
439     {
440       SequenceAny *elt(_passedOutputs[i]);
441       SequenceAny *eltDestination(execVals[i]);
442       if(!elt)
443         throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
444       unsigned int szOfElt(elt->size());
445       for(unsigned int j=0;j<szOfElt;j++)
446         {
447           AnyPtr elt1((*elt)[j]);
448           int jAbs(toAbsIdNot(j));
449           eltDestination->setEltAtRank(jAbs,elt1);
450         }
451     }
452 }
453
454 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
455                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
456                                                                                 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
457 {
458 }
459
460 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
461                                                                                            _splitterNode(other._splitterNode,this),
462                                                                                            _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
463 {
464   int i=0;
465   if(!editionOnly)
466     for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
467       {
468         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
469         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
470         temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
471         interc->setRepr(temp);
472         _outGoingPorts.push_back(temp);
473         _intecptrsForOutGoingPorts.push_back(interc);
474       }
475 }
476
477 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
478 {
479   return new ForEachLoop(*this,father,editionOnly);
480 }
481
482 ForEachLoop::~ForEachLoop()
483 {
484   cleanDynGraph();
485   for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
486     delete *iter;
487   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
488     delete *iter2;
489   delete _passedData;
490 }
491
492 void ForEachLoop::init(bool start)
493 {
494   DynParaLoop::init(start);
495   _splitterNode.init(start);
496   _execCurrentId=0;
497   cleanDynGraph();
498   _currentIndex = 0;
499   exUpdateProgress();
500   if(_passedData)
501     _passedData->init();
502 }
503
504 void ForEachLoop::exUpdateState()
505 {
506   DEBTRACE("ForEachLoop::exUpdateState");
507   if(_state == YACS::DISABLED)
508     return;
509   if(_state == YACS::DONE)
510     return;
511   if(_inGate.exIsReady())
512     {
513       //internal graph update
514       int i;
515       int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
516       if(_passedData)
517         {
518           _passedData->checkCompatibilyWithNb(nbOfElts);
519           nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
520         }
521       int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
522
523       DEBTRACE("nbOfElts=" << nbOfElts);
524       DEBTRACE("nbOfBr=" << nbOfBr);
525
526       if(nbOfEltsToDo==0)
527         {
528           prepareSequenceValues(0);
529           delete _nodeForSpecialCases;
530           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
531           setState(YACS::ACTIVATED);
532           return ;
533         }
534       if(nbOfBr<=0)
535         {
536           delete _nodeForSpecialCases;
537           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
538           setState(YACS::ACTIVATED);
539           return ;
540         }
541       if(nbOfBr>nbOfEltsToDo)
542         nbOfBr=nbOfEltsToDo;
543       _execNodes.resize(nbOfBr);
544       _execIds.resize(nbOfBr);
545       _execOutGoingPorts.resize(nbOfBr);
546       prepareSequenceValues(nbOfElts);
547       if(_initNode)
548         _execInitNodes.resize(nbOfBr);
549       _initializingCounter = 0;
550       if (_finalizeNode)
551         _execFinalizeNodes.resize(nbOfBr);
552
553       vector<Node *> origNodes;
554       origNodes.push_back(_initNode);
555       origNodes.push_back(_node);
556       origNodes.push_back(_finalizeNode);
557
558       //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors 
559       //so catch them to control errors
560       try
561         {
562           for(i=0;i<nbOfBr;i++)
563             {
564               DEBTRACE( "-------------- 2" );
565               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
566               if(_initNode)
567                 _execInitNodes[i] = clonedNodes[0];
568               _execNodes[i] = clonedNodes[1];
569               if(_finalizeNode)
570                 _execFinalizeNodes[i] = clonedNodes[2];
571               DEBTRACE( "-------------- 4" );
572               prepareInputsFromOutOfScope(i);
573               DEBTRACE( "-------------- 5" );
574               createOutputOutOfScopeInterceptors(i);
575               DEBTRACE( "-------------- 6" );
576             }
577           for(i=0;i<nbOfBr;i++)
578             {
579               DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
580               _execIds[i]=_execCurrentId;
581               int posInAbs(_execCurrentId);
582               if(_passedData)
583                 posInAbs=_passedData->toAbsId(_execCurrentId);
584               _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
585               _execCurrentId++;
586               DEBTRACE( "-------------- 7" );
587             }
588           if(_passedData)
589             {
590               _passedData->checkLevel2(_execOutGoingPorts[0]);
591               _passedData->assignAlreadyDone(_execVals);
592             }
593         }
594       catch(YACS::Exception& ex)
595         {
596           //ForEachLoop must be put in error and the exception rethrown to notify the caller
597           DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
598           setState(YACS::ERROR);
599           exForwardFailed();
600           throw;
601         }
602
603       setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
604
605       //let's go
606       for(i=0;i<nbOfBr;i++)
607         if(_initNode)
608           {
609             _execInitNodes[i]->exUpdateState();
610             _initializingCounter++;
611           }
612         else
613           {
614             _nbOfEltConsumed++;
615             _execNodes[i]->exUpdateState();
616           }
617
618       forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
619     }
620 }
621
622 void ForEachLoop::exUpdateProgress()
623 {
624   // emit notification to all observers registered with the dispatcher on any change of the node's state
625   sendEvent("progress");
626 }
627
628 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
629 {
630   if(!_node)
631     return;
632   if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
633   if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
634     {
635       if(_nodeForSpecialCases)
636         {
637           _nodeForSpecialCases->getReadyTasks(tasks);
638           return ;
639         }
640       vector<Node *>::iterator iter;
641       for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
642         (*iter)->getReadyTasks(tasks);
643       for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
644         (*iter)->getReadyTasks(tasks);
645       for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
646         (*iter)->getReadyTasks(tasks);
647     }
648 }
649
650 int ForEachLoop::getNumberOfInputPorts() const
651 {
652   return DynParaLoop::getNumberOfInputPorts()+1;
653 }
654
655 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
656 {
657   //TO DO
658 }
659
660 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
661 {
662 }
663
664 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
665 {
666   list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
667   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
668   return ret;
669 }
670
671 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
672 {
673   list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
674   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
675   return ret;
676 }
677
678 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
679 {
680   if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
681     return (InputPort *)&_splitterNode._dataPortToDispatch;
682   else
683     return DynParaLoop::getInputPort(name);
684 }
685
686 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
687 {
688   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
689     {
690       if(name==(*iter)->getName())
691         return (OutputPort *)(*iter);
692     }
693   return DynParaLoop::getOutputPort(name);
694 }
695
696 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
697 {
698   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
699     {
700       if(name==(*iter)->getName())
701         return (OutPort *)(*iter);
702     }
703   return DynParaLoop::getOutPort(name);
704 }
705
706 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
707 {
708   if(name==NAME_OF_SPLITTERNODE)
709     return (Node *)&_splitterNode;
710   else
711     return DynParaLoop::getChildByShortName(name);
712 }
713
714 //! Method used to notify the node that a child node has finished
715 /*!
716  * Update the current state and return the change state
717  *
718  *  \param node : the child node that has finished
719  *  \return the state change
720  */
721 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
722 {
723   DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
724   unsigned int id;
725   switch(getIdentityOfNotifyerNode(node,id))
726     {
727     case INIT_NODE:
728       return updateStateForInitNodeOnFinishedEventFrom(node,id);
729     case WORK_NODE:
730       return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
731     case FINALIZE_NODE:
732       return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
733     default:
734       YASSERT(false);
735     }
736   return YACS::NOEVENT;
737 }
738
739 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
740 {
741   _execNodes[id]->exUpdateState();
742   _nbOfEltConsumed++;
743   _initializingCounter--;
744   if (_initializingCounter == 0)
745     _initNode->setState(DONE);
746   return YACS::NOEVENT;
747 }
748
749 /*!
750  * \param [in] isNormalFinish - if true
751  */
752 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
753 {
754   _currentIndex++;
755   exUpdateProgress();
756   if(isNormalFinish)
757     {
758       int globalId(_execIds[id]);
759       if(_passedData)
760         globalId=_passedData->toAbsId(globalId);
761       storeOutValsInSeqForOutOfScopeUse(globalId,id);
762     }
763   //
764   if(_execCurrentId==getFinishedId())
765     {//No more elements of _dataPortToDispatch to treat
766       _execIds[id]=NOT_RUNNING_BRANCH_ID;
767       //analyzing if some samples are still on treatment on other branches.
768       bool isFinished(true);
769       for(int i=0;i<_execIds.size() && isFinished;i++)
770         isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
771       if(isFinished)
772         {
773           try
774           {
775               if(_failedCounter!=0)
776                 {// case of keepgoing mode + a failed
777                   std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
778                   DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
779                   setState(YACS::FAILED);
780                   return YACS::ABORT;
781                 }
782               pushAllSequenceValues();
783
784               if (_node)
785                 {
786                   _node->setState(YACS::DONE);
787
788                   ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
789                   if (compNode)
790                     {
791                       std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
792                       std::list<Node *>::iterator iter=aChldn.begin();
793                       for(;iter!=aChldn.end();iter++)
794                         (*iter)->setState(YACS::DONE);
795                     }
796                 }
797
798               if (_finalizeNode == NULL)
799                 {
800                   // No finalize node, we just finish the loop at the end of exec nodes execution
801                   setState(YACS::DONE);
802                   return YACS::FINISH;
803                 }
804               else
805                 {
806                   // Run the finalize nodes, the loop will be done only when they all finish
807                   _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
808                   for (int i=0 ; i<_execIds.size() ; i++)
809                     {
810                       YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
811                       DEBTRACE("Launching finalize node for branch " << i);
812                       _execFinalizeNodes[i]->exUpdateState();
813                       _unfinishedCounter++;
814                     }
815                   return YACS::NOEVENT;
816                 }
817           }
818           catch(YACS::Exception& ex)
819           {
820               DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
821               //no way to push results : put following nodes in FAILED state
822               //TODO could be more fine grain : put only concerned nodes in FAILED state
823               exForwardFailed();
824               setState(YACS::ERROR);
825               return YACS::ABORT;
826           }
827         }
828     }
829   else if(_state == YACS::ACTIVATED)
830     {//more elements to do and loop still activated
831       _execIds[id]=_execCurrentId;
832       node->init(false);
833       int posInAbs(_execCurrentId);
834       if(_passedData)
835         posInAbs=_passedData->toAbsId(_execCurrentId);
836       _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
837       _execCurrentId++;
838       node->exUpdateState();
839       forwardExecStateToOriginalBody(node);
840       _nbOfEltConsumed++;
841     }
842   else
843     {//elements to process and loop no more activated
844       DEBTRACE("foreach loop state " << _state);
845     }
846   return YACS::NOEVENT;
847 }
848
849 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
850 {
851   DEBTRACE("Finalize node finished on branch " << id);
852   _unfinishedCounter--;
853   _currentIndex++;
854   exUpdateProgress();
855   DEBTRACE(_unfinishedCounter << " finalize nodes still running");
856   if (_unfinishedCounter == 0)
857     {
858       _finalizeNode->setState(YACS::DONE);
859       setState(YACS::DONE);
860       return YACS::FINISH;
861     }
862   else
863     return YACS::NOEVENT;
864 }
865
866 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
867 {
868   unsigned int id;
869   DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
870   if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
871     return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
872   else
873     {
874       _failedCounter++;
875       return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
876     }
877 }
878
879 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
880 {
881   DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
882   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
883   if(typeOfPortInstance==OutputPort::NAME)
884     {
885       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
886       int i=0;
887       for(;iter!=_outGoingPorts.end();iter++,i++)
888         if((*iter)->getRepr()==port.first || *iter==port.first)
889           break;
890       if(iter!=_outGoingPorts.end())
891         {
892           if(*iter!=port.first)
893             {
894               (*iter)->incrRef();
895               (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
896             }
897           port.first=*iter;
898         }
899       else
900         {
901           TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
902           // The out going ports belong to the ForEachLoop, whereas
903           // the delegated port belong to a node child of the ForEachLoop.
904           // The name of the delegated port contains dots (bloc.node.outport),
905           // whereas the name of the out going port shouldn't do.
906           std::string outputPortName = getPortName(port.first);
907           std::replace_if (outputPortName.begin(), outputPortName.end(),
908                            std::bind1st(std::equal_to<char>(), '.'), '_');
909           outputPortName += "_interceptor";
910           AnySplitOutputPort *newPort=new AnySplitOutputPort(outputPortName,this,newTc);
911           InterceptorInputPort *intercptor=new InterceptorInputPort(outputPortName + "_in",this,port.first->edGetType());
912           intercptor->setRepr(newPort);
913           newTc->decrRef();
914           newPort->addRepr(port.first,intercptor);
915           _outGoingPorts.push_back(newPort);
916           _intecptrsForOutGoingPorts.push_back(intercptor);
917           port.first=newPort;
918         }
919     }
920   else
921     throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
922 }
923
924 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
925 {
926   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
927   if(typeOfPortInstance==OutputPort::NAME)
928     {
929       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
930       for(;iter!=_outGoingPorts.end();iter++)
931         if((*iter)->getRepr()==port.first)
932           break;
933       if(iter==_outGoingPorts.end())
934         {
935           string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name; 
936           throw Exception(what);
937         }
938       else
939         port.first=(*iter);
940     }
941   else
942     throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
943 }
944
945 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
946 {
947   string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
948   if(typeOfPortInstance==OutputPort::NAME)
949     {
950       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
951       vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
952       for(;iter!=_outGoingPorts.end();iter++,iter2++)
953         if((*iter)->getRepr()==portDwn)
954           break;
955       //ASSERT(portUp==*iter.second)
956       if((*iter)->decrRef())
957         {
958           AnySplitOutputPort *p=*iter;
959           _outGoingPorts.erase(iter);
960           delete p;
961           InterceptorInputPort *ip=*iter2;
962           _intecptrsForOutGoingPorts.erase(iter2);
963           delete ip;
964         }
965     }
966 }
967
968 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
969 {
970   string portName, nodeName;
971   splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
972   Node *staticChild = getChildByName(nodeName);
973   return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
974   //that a link starting from _initNode goes out of scope of 'this'.
975 }
976
977 void ForEachLoop::cleanDynGraph()
978 {
979   DynParaLoop::cleanDynGraph();
980   for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
981     (*iter3)->decrRef();
982   _execVals.clear();
983   for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
984     for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
985       delete *iter5;
986   _execOutGoingPorts.clear();
987 }
988
989 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
990 {
991   vector<AnyInputPort *>::iterator iter;
992   int i=0;
993   for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
994     {
995       Any *val=(Any *)(*iter)->getValue();
996       _execVals[i]->setEltAtRank(rank,val);
997     }
998 }
999
1000 int ForEachLoop::getFinishedId()
1001 {
1002   if(!_passedData)
1003     return _splitterNode.getNumberOfElements();
1004   else
1005     return _passedData->getNumberOfElementsToDo();
1006 }
1007
1008 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
1009 {
1010   _execVals.resize(_outGoingPorts.size());
1011   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1012   for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1013     _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1014 }
1015
1016 void ForEachLoop::pushAllSequenceValues()
1017 {
1018   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1019   int i=0;
1020   for(;iter!=_outGoingPorts.end();iter++,i++)
1021     (*iter)->put((const void *)_execVals[i]);
1022 }
1023
1024 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
1025 {
1026   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1027   int i=0;
1028   for(;iter!=_outGoingPorts.end();iter++,i++)
1029     {
1030       DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1031       //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1032       OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1033       DEBTRACE( portOut->getName() );
1034       AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
1035       portOut->addInPort(interceptor);
1036       _execOutGoingPorts[branchNb].push_back(interceptor);
1037     }
1038 }
1039
1040 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1041                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
1042 {
1043   if(isInMyDescendance(start->getNode())==_node)
1044     throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
1045 }
1046
1047 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
1048 {
1049   list<OutputPort *> ret;
1050   ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); 
1051   return ret;
1052 }
1053
1054 void ForEachLoop::accept(Visitor *visitor)
1055 {
1056   visitor->visitForEachLoop(this);
1057 }
1058
1059 //! Dump the node state to a stream
1060 /*!
1061  * \param os : the output stream
1062  */
1063 void ForEachLoop::writeDot(std::ostream &os) const
1064 {
1065   os << "  subgraph cluster_" << getId() << "  {\n" ;
1066   //only one node in a loop
1067   if(_node)
1068     {
1069       _node->writeDot(os);
1070       os << getId() << " -> " << _node->getId() << ";\n";
1071     }
1072   os << "}\n" ;
1073   os << getId() << "[fillcolor=\"" ;
1074   YACS::StatesForNode state=getEffectiveState();
1075   os << getColorState(state);
1076   os << "\" label=\"" << "Loop:" ;
1077   os << getName() <<"\"];\n";
1078 }
1079
1080 //! Reset the state of the node and its children depending on the parameter level
1081 void ForEachLoop::resetState(int level)
1082 {
1083   if(level==0)return;
1084   DynParaLoop::resetState(level);
1085   _execCurrentId=0;
1086   //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) 
1087   cleanDynGraph();
1088 }
1089
1090 std::string ForEachLoop::getProgress() const
1091 {
1092   int nbElems(getNbOfElementsToBeProcessed());
1093   std::stringstream aProgress;
1094   if (nbElems > 0)
1095     aProgress << _currentIndex << "/" << nbElems;
1096   else
1097     aProgress << "0";
1098   return aProgress.str();
1099 }
1100
1101 int ForEachLoop::getNbOfElementsToBeProcessed() const
1102 {
1103   return _splitterNode.getNumberOfElements();
1104 }
1105
1106 /*!
1107  * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
1108  * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
1109  * This method has one input \a execut and 3 outputs.
1110  *
1111  * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
1112  * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
1113  *                        All of the items in \a outputs have the same size.
1114  * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
1115  * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
1116  *         SequenceAny objects contained in \a outputs.
1117  *
1118  * \sa edGetSeqOfSamplesPort
1119  */
1120 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1121 {
1122   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
1123   if(_execVals.empty())
1124     return std::vector<unsigned int>();
1125   if(_execOutGoingPorts.empty())
1126     return std::vector<unsigned int>();
1127   std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
1128   const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1129   for(std::size_t i=0;i<sz;i++)
1130     {
1131       outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1132       nameOfOutputs[i]=ports[i]->getName();
1133     }
1134   return _execVals[0]->getSetItems();
1135 }
1136
1137 /*!
1138  * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
1139  * getPassedResults method.
1140  */
1141 void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1142 {
1143   delete _passedData;
1144   _failedCounter=0;
1145   _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1146 }
1147