Salome HOME
e8bab99d16aba36c02572d23391dfcdb49f90db0
[modules/yacs.git] / src / engine / ForEachLoop.cxx
1 // Copyright (C) 2006-2015  CEA/DEN, EDF R&D
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 //
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
18 //
19
20 #include "ForEachLoop.hxx"
21 #include "TypeCode.hxx"
22 #include "Visitor.hxx"
23 #include "ComposedNode.hxx"
24 #include "Executor.hxx"
25 #include "AutoLocker.hxx"
26
27 #include <iostream>
28 #include <iomanip>
29 #include <sstream>
30
31 //#define _DEVDEBUG_
32 #include "YacsTrace.hxx"
33
34 using namespace YACS::ENGINE;
35 using namespace std;
36
37 /*! \class YACS::ENGINE::ForEachLoop
38  *  \brief Loop node for parametric calculation
39  *
40  *  \ingroup Nodes
41  */
42
43 const char FakeNodeForForEachLoop::NAME[]="thisIsAFakeNode";
44
45 const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
46
47 const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
48
49 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
50
51 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
52                                                                                                 DataPort(name,node,type),Port(node),
53                                                                                                 _repr(0)
54 {
55 }
56
57 InterceptorInputPort::InterceptorInputPort(const  InterceptorInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),
58                                                                                                 Port(other,newHelder),
59                                                                                                 _repr(0)
60 {
61 }
62
63 void InterceptorInputPort::getAllRepresentants(std::set<InPort *>& repr) const
64 {
65   set<InPort *> ports=_repr->edSetInPort();
66   for(set<InPort *>::iterator iter=ports.begin();iter!=ports.end();iter++)
67     (*iter)->getAllRepresentants(repr);
68 }
69
70 InputPort *InterceptorInputPort::clone(Node *newHelder) const
71 {
72   return new InterceptorInputPort(*this,newHelder);
73 }
74
75 void InterceptorInputPort::setRepr(AnySplitOutputPort *repr)
76 {
77   _repr=repr;
78 }
79
80 bool AnySplitOutputPort::decrRef()
81 {
82   return (--_cnt==0);
83 }
84
85 void AnySplitOutputPort::incrRef() const
86 {
87   _cnt++;
88 }
89
90 AnySplitOutputPort::AnySplitOutputPort(const std::string& name, Node *node, TypeCode *type):OutputPort(name,node,type),
91                                                                                             DataPort(name,node,type),Port(node),
92                                                                                             _repr(0),_intercptr(0),_cnt(1)
93 {
94 }
95
96 AnySplitOutputPort::AnySplitOutputPort(const AnySplitOutputPort& other, Node *newHelder):OutputPort(other,newHelder),
97                                                                                          DataPort(other,newHelder),
98                                                                                          Port(other,newHelder),
99                                                                                          _repr(0),_intercptr(0),_cnt(1)
100 {
101 }
102
103 bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
104 {
105   bool ret=OutputPort::addInPort(inPort);
106   if(_repr)
107     _repr->addInPort(_intercptr);
108   return ret;
109 }
110
111 void AnySplitOutputPort::getAllRepresented(std::set<OutPort *>& represented) const
112 {
113   if(!_repr)
114     OutPort::getAllRepresented(represented);
115   else
116     _repr->getAllRepresented(represented);
117 }
118
119 int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
120 {
121   bool ret=OutputPort::removeInPort(inPort,forward);
122   if(_repr)
123     if(_setOfInputPort.empty())
124       _repr->removeInPort(_intercptr,forward);
125   return ret;
126 }
127
128 void AnySplitOutputPort::addRepr(OutPort *repr, InterceptorInputPort *intercptr)
129 {
130   _repr=repr;
131   _intercptr=intercptr;
132 }
133
134 OutputPort *AnySplitOutputPort::clone(Node *newHelder) const
135 {
136   return new AnySplitOutputPort(*this,newHelder);
137 }
138
139 SeqAnyInputPort::SeqAnyInputPort(const std::string& name, Node *node, TypeCodeSeq* type):AnyInputPort(name,node,type),DataPort(name,node,type),Port(node)
140 {
141   _type->decrRef();
142 }
143
144 SeqAnyInputPort::SeqAnyInputPort(const  SeqAnyInputPort& other, Node *newHelder):AnyInputPort(other,newHelder),DataPort(other,newHelder),Port(other,newHelder)
145 {
146 }
147
148 InputPort *SeqAnyInputPort::clone(Node *newHelder) const
149 {
150   return new SeqAnyInputPort(*this,newHelder);
151 }
152
153 unsigned SeqAnyInputPort::getNumberOfElements() const
154 {
155   const SequenceAny * valCsted=(const SequenceAny *) _value;
156   if (valCsted) return valCsted->size();
157   return 0;
158 }
159
160 Any *SeqAnyInputPort::getValueAtRank(int i) const
161 {
162   const SequenceAny * valCsted=(const SequenceAny *) _value;
163   AnyPtr ret=(*valCsted)[i];
164   ret->incrRef();
165   return ret;
166 }
167
168 std::string SeqAnyInputPort::dump()
169 {
170   stringstream xmldump;
171   int nbElem = getNumberOfElements();
172   xmldump << "<value><array><data>" << endl;
173   for (int i = 0; i < nbElem; i++)
174     {
175       Any *val = getValueAtRank(i);
176       switch (val->getType()->kind())
177         {
178         case Double:
179           xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
180           break;
181         case Int:
182           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
183           break;
184         case Bool:
185           xmldump << "<value><boolean>" << val->getBoolValue() << "</boolean></value>" << endl;
186           break;
187         case String:
188           xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
189           break;
190         case Objref:
191           xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
192           break;
193         default:
194           xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
195           break;
196         }
197     }
198   xmldump << "</data></array></value>" << endl;
199   return xmldump.str();
200 }
201
202 SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData, 
203                            ForEachLoop *father):ElementaryNode(name),
204                                                 _dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
205                                                                     this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
206 {
207   _father=father;
208 }
209
210 SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
211                                                                            _dataPortToDispatch(other._dataPortToDispatch,this)
212 {
213 }
214
215 InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
216 {
217   if(name==NAME_OF_SEQUENCE_INPUT)
218     return (InputPort *)&_dataPortToDispatch;
219   else
220     return ElementaryNode::getInputPort(name);
221 }
222
223 Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
224 {
225   return new SplitterNode(*this,(ForEachLoop *)father);
226 }
227
228 unsigned SplitterNode::getNumberOfElements() const
229 {
230   return _dataPortToDispatch.getNumberOfElements();
231 }
232
233 void SplitterNode::execute()
234 {
235   //Nothing : should never been called elsewhere big problem...
236 }
237
238 void SplitterNode::init(bool start)
239 {
240   ElementaryNode::init(start);
241   _dataPortToDispatch.exInit(start);
242 }
243
244 void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
245 {
246   Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
247   ForEachLoop *fatherTyped=(ForEachLoop *)_father;
248   fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
249   valueToDispatch->decrRef();
250 }
251
252 FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
253                                                                                      _loop(loop),
254                                                                                      _normalFinish(normalFinish)
255 {
256   _state=YACS::TOACTIVATE;
257   _father=_loop->getFather();
258 }
259
260 FakeNodeForForEachLoop::FakeNodeForForEachLoop(const FakeNodeForForEachLoop& other):ElementaryNode(other),_loop(0),
261                                                                                     _normalFinish(false)
262 {
263 }
264
265 Node *FakeNodeForForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
266 {
267   return new FakeNodeForForEachLoop(*this);
268 }
269
270 void FakeNodeForForEachLoop::exForwardFailed()
271 {
272   _loop->exForwardFailed();
273 }
274
275 void FakeNodeForForEachLoop::exForwardFinished()
276
277   _loop->exForwardFinished();
278 }
279
280 void FakeNodeForForEachLoop::execute()
281 {
282   if(!_normalFinish)
283     throw Exception("");//only to trigger ABORT on Executor
284   else
285     _loop->pushAllSequenceValues();
286 }
287
288 void FakeNodeForForEachLoop::aborted()
289 {
290   _loop->setState(YACS::ERROR);
291 }
292
293 void FakeNodeForForEachLoop::finished()
294 {
295   _loop->setState(YACS::DONE);
296 }
297
298 ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
299 {
300   std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
301   if(sz1!=sz2)
302     throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
303   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
304     {
305       const SequenceAny *elt(*it);
306       if(elt)
307         if(sz!=(std::size_t)elt->size())
308           throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
309     }
310   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
311     {
312       SequenceAny *elt(*it);
313       if(elt)
314         elt->incrRef();
315     }
316 }
317
318 ForEachLoopPassedData::~ForEachLoopPassedData()
319 {
320   for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
321     {
322       SequenceAny *elt(*it);
323       if(elt)
324         elt->decrRef();
325     }
326 }
327
328 void ForEachLoopPassedData::init()
329 {
330   _flagsIds.clear();
331 }
332
333 void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
334 {
335   if(nbOfElts<0)
336     throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
337   std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
338   if(nbOfElts2<sizeExp)
339     throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
340   for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
341     {
342       if((*it)>=nbOfElts2)
343         throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
344     }
345   _flagsIds.resize(nbOfElts);
346   std::fill(_flagsIds.begin(),_flagsIds.end(),false);
347   for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
348     {
349       if(*it<nbOfElts)
350         {
351           if(!_flagsIds[*it])
352             _flagsIds[*it]=true;
353           else
354             {
355               std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
356               throw YACS::Exception(oss.str());
357             }
358         }
359       else
360         {
361           std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," <<  nbOfElts << ") !";
362           throw YACS::Exception(oss.str());
363         }
364     }
365 }
366
367 void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
368 {
369   std::size_t sz(_nameOfOutputs.size());
370   if(sz!=ports.size())
371     throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
372   for(std::size_t i=0;i<sz;i++)
373     {
374       AnyInputPort *elt(ports[i]);
375       if(!elt)
376         throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
377       if(_nameOfOutputs[i]!=elt->getName())
378         {
379           std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
380           throw YACS::Exception(oss.str());
381         }
382     }
383 }
384
385 /*!
386  * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
387  */
388 int ForEachLoopPassedData::toAbsId(int localId) const
389 {
390   if(localId<0)
391     throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
392   int ret(0),curLocId(0);
393   for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
394     {
395       if(!*it)
396         {
397           if(localId==curLocId)
398             return ret;
399           curLocId++;
400         }
401     }
402   throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
403 }
404
405 /*!
406  * Equivalent to toAbsId except that only ON are considered here.
407  */
408 int ForEachLoopPassedData::toAbsIdNot(int localId) const
409 {
410   if(localId<0)
411     throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
412   int ret(0),curLocId(0);
413   for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
414     {
415       if(*it)//<- diff is here !
416         {
417           if(localId==curLocId)
418             return ret;
419           curLocId++;
420         }
421     }
422   throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
423 }
424
425 int ForEachLoopPassedData::getNumberOfElementsToDo() const
426 {
427   std::size_t nbAllElts(_flagsIds.size());
428   std::size_t ret(nbAllElts-_passedIds.size());
429   return ret;
430 }
431
432 void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
433 {
434   std::size_t sz(execVals.size());
435   if(_passedOutputs.size()!=sz)
436     throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
437   for(std::size_t i=0;i<sz;i++)
438     {
439       SequenceAny *elt(_passedOutputs[i]);
440       SequenceAny *eltDestination(execVals[i]);
441       if(!elt)
442         throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
443       unsigned int szOfElt(elt->size());
444       for(unsigned int j=0;j<szOfElt;j++)
445         {
446           AnyPtr elt1((*elt)[j]);
447           int jAbs(toAbsIdNot(j));
448           eltDestination->setEltAtRank(jAbs,elt1);
449         }
450     }
451 }
452
453 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
454                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
455                                                                                 _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
456 {
457 }
458
459 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
460                                                                                            _splitterNode(other._splitterNode,this),
461                                                                                            _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
462 {
463   int i=0;
464   if(!editionOnly)
465     for(vector<AnySplitOutputPort *>::const_iterator iter2=other._outGoingPorts.begin();iter2!=other._outGoingPorts.end();iter2++,i++)
466       {
467         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
468         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
469         temp->addRepr(getOutPort((*iter2)->getName()),interc);
470         interc->setRepr(temp);
471         _outGoingPorts.push_back(temp);
472         _intecptrsForOutGoingPorts.push_back(interc);
473       }
474 }
475
476 Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
477 {
478   return new ForEachLoop(*this,father,editionOnly);
479 }
480
481 ForEachLoop::~ForEachLoop()
482 {
483   cleanDynGraph();
484   for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
485     delete *iter;
486   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
487     delete *iter2;
488   delete _passedData;
489 }
490
491 void ForEachLoop::init(bool start)
492 {
493   DynParaLoop::init(start);
494   _splitterNode.init(start);
495   _execCurrentId=0;
496   cleanDynGraph();
497   _currentIndex = 0;
498   exUpdateProgress();
499   if(_passedData)
500     _passedData->init();
501 }
502
503 void ForEachLoop::exUpdateState()
504 {
505   DEBTRACE("ForEachLoop::exUpdateState");
506   if(_state == YACS::DISABLED)
507     return;
508   if(_inGate.exIsReady())
509     {
510       //internal graph update
511       int i;
512       int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
513       if(_passedData)
514         {
515           _passedData->checkCompatibilyWithNb(nbOfElts);
516           nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
517         }
518       int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
519
520       DEBTRACE("nbOfElts=" << nbOfElts);
521       DEBTRACE("nbOfBr=" << nbOfBr);
522
523       if(nbOfEltsToDo==0)
524         {
525           prepareSequenceValues(0);
526           delete _nodeForSpecialCases;
527           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,true);
528           setState(YACS::ACTIVATED);
529           return ;
530         }
531       if(nbOfBr<=0)
532         {
533           delete _nodeForSpecialCases;
534           _nodeForSpecialCases=new FakeNodeForForEachLoop(this,getAllOutPortsLeavingCurrentScope().empty());
535           setState(YACS::ACTIVATED);
536           return ;
537         }
538       if(nbOfBr>nbOfEltsToDo)
539         nbOfBr=nbOfEltsToDo;
540       _execNodes.resize(nbOfBr);
541       _execIds.resize(nbOfBr);
542       _execOutGoingPorts.resize(nbOfBr);
543       prepareSequenceValues(nbOfElts);
544       if(_initNode)
545         _execInitNodes.resize(nbOfBr);
546       _initializingCounter = 0;
547       if (_finalizeNode)
548         _execFinalizeNodes.resize(nbOfBr);
549
550       vector<Node *> origNodes;
551       origNodes.push_back(_initNode);
552       origNodes.push_back(_node);
553       origNodes.push_back(_finalizeNode);
554
555       //Conversion exceptions can be thrown by createOutputOutOfScopeInterceptors 
556       //so catch them to control errors
557       try
558         {
559           for(i=0;i<nbOfBr;i++)
560             {
561               DEBTRACE( "-------------- 2" );
562               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
563               if(_initNode)
564                 _execInitNodes[i] = clonedNodes[0];
565               _execNodes[i] = clonedNodes[1];
566               if(_finalizeNode)
567                 _execFinalizeNodes[i] = clonedNodes[2];
568               DEBTRACE( "-------------- 4" );
569               prepareInputsFromOutOfScope(i);
570               DEBTRACE( "-------------- 5" );
571               createOutputOutOfScopeInterceptors(i);
572               DEBTRACE( "-------------- 6" );
573             }
574           for(i=0;i<nbOfBr;i++)
575             {
576               DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
577               _execIds[i]=_execCurrentId;
578               int posInAbs(_execCurrentId);
579               if(_passedData)
580                 posInAbs=_passedData->toAbsId(_execCurrentId);
581               _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
582               _execCurrentId++;
583               DEBTRACE( "-------------- 7" );
584             }
585           if(_passedData)
586             {
587               _passedData->checkLevel2(_execOutGoingPorts[0]);
588               _passedData->assignAlreadyDone(_execVals);
589             }
590         }
591       catch(YACS::Exception& ex)
592         {
593           //ForEachLoop must be put in error and the exception rethrown to notify the caller
594           DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
595           setState(YACS::ERROR);
596           exForwardFailed();
597           throw;
598         }
599
600       setState(YACS::ACTIVATED); // move the calling of setState method there for adding observers for clone nodes in GUI part
601
602       //let's go
603       for(i=0;i<nbOfBr;i++)
604         if(_initNode)
605           {
606             _execInitNodes[i]->exUpdateState();
607             _initializingCounter++;
608           }
609         else
610           {
611             _nbOfEltConsumed++;
612             _execNodes[i]->exUpdateState();
613           }
614
615       forwardExecStateToOriginalBody(_execNodes[nbOfBr-1]);
616     }
617 }
618
619 void ForEachLoop::exUpdateProgress()
620 {
621   // emit notification to all observers registered with the dispatcher on any change of the node's state
622   sendEvent("progress");
623 }
624
625 void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
626 {
627   if(!_node)
628     return;
629   if(_state==YACS::TOACTIVATE) setState(YACS::ACTIVATED);
630   if(_state==YACS::TOACTIVATE || _state==YACS::ACTIVATED)
631     {
632       if(_nodeForSpecialCases)
633         {
634           _nodeForSpecialCases->getReadyTasks(tasks);
635           return ;
636         }
637       vector<Node *>::iterator iter;
638       for (iter=_execNodes.begin() ; iter!=_execNodes.end() ; iter++)
639         (*iter)->getReadyTasks(tasks);
640       for (iter=_execInitNodes.begin() ; iter!=_execInitNodes.end() ; iter++)
641         (*iter)->getReadyTasks(tasks);
642       for (iter=_execFinalizeNodes.begin() ; iter!=_execFinalizeNodes.end() ; iter++)
643         (*iter)->getReadyTasks(tasks);
644     }
645 }
646
647 int ForEachLoop::getNumberOfInputPorts() const
648 {
649   return DynParaLoop::getNumberOfInputPorts()+1;
650 }
651
652 void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
653 {
654   //TO DO
655 }
656
657 void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
658 {
659 }
660
661 std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
662 {
663   list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
664   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
665   return ret;
666 }
667
668 std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
669 {
670   list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
671   ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
672   return ret;
673 }
674
675 InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
676 {
677   if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
678     return (InputPort *)&_splitterNode._dataPortToDispatch;
679   else
680     return DynParaLoop::getInputPort(name);
681 }
682
683 OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
684 {
685   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
686     {
687       if(name==(*iter)->getName())
688         return (OutputPort *)(*iter);
689     }
690   return DynParaLoop::getOutputPort(name);
691 }
692
693 OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
694 {
695   for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
696     {
697       if(name==(*iter)->getName())
698         return (OutPort *)(*iter);
699     }
700   return DynParaLoop::getOutPort(name);
701 }
702
703 Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
704 {
705   if(name==NAME_OF_SPLITTERNODE)
706     return (Node *)&_splitterNode;
707   else
708     return DynParaLoop::getChildByShortName(name);
709 }
710
711 //! Method used to notify the node that a child node has finished
712 /*!
713  * Update the current state and return the change state
714  *
715  *  \param node : the child node that has finished
716  *  \return the state change
717  */
718 YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
719 {
720   DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
721   unsigned int id;
722   switch(getIdentityOfNotifyerNode(node,id))
723     {
724     case INIT_NODE:
725       return updateStateForInitNodeOnFinishedEventFrom(node,id);
726     case WORK_NODE:
727       return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
728     case FINALIZE_NODE:
729       return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
730     default:
731       YASSERT(false);
732     }
733   return YACS::NOEVENT;
734 }
735
736 YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
737 {
738   _execNodes[id]->exUpdateState();
739   _nbOfEltConsumed++;
740   _initializingCounter--;
741   if (_initializingCounter == 0)
742     _initNode->setState(DONE);
743   return YACS::NOEVENT;
744 }
745
746 /*!
747  * \param [in] isNormalFinish - if true
748  */
749 YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
750 {
751   _currentIndex++;
752   exUpdateProgress();
753   if(isNormalFinish)
754     {
755       int globalId(_execIds[id]);
756       if(_passedData)
757         globalId=_passedData->toAbsId(globalId);
758       storeOutValsInSeqForOutOfScopeUse(globalId,id);
759     }
760   //
761   if(_execCurrentId==getFinishedId())
762     {//No more elements of _dataPortToDispatch to treat
763       _execIds[id]=NOT_RUNNING_BRANCH_ID;
764       //analyzing if some samples are still on treatment on other branches.
765       bool isFinished(true);
766       for(int i=0;i<_execIds.size() && isFinished;i++)
767         isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
768       if(isFinished)
769         {
770           try
771           {
772               if(_failedCounter!=0)
773                 {// case of keepgoing mode + a failed
774                   std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
775                   DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
776                   setState(YACS::FAILED);
777                   return YACS::ABORT;
778                 }
779               pushAllSequenceValues();
780
781               if (_node)
782                 {
783                   _node->setState(YACS::DONE);
784
785                   ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
786                   if (compNode)
787                     {
788                       std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
789                       std::list<Node *>::iterator iter=aChldn.begin();
790                       for(;iter!=aChldn.end();iter++)
791                         (*iter)->setState(YACS::DONE);
792                     }
793                 }
794
795               if (_finalizeNode == NULL)
796                 {
797                   // No finalize node, we just finish the loop at the end of exec nodes execution
798                   setState(YACS::DONE);
799                   return YACS::FINISH;
800                 }
801               else
802                 {
803                   // Run the finalize nodes, the loop will be done only when they all finish
804                   _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
805                   for (int i=0 ; i<_execIds.size() ; i++)
806                     {
807                       YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
808                       DEBTRACE("Launching finalize node for branch " << i);
809                       _execFinalizeNodes[i]->exUpdateState();
810                       _unfinishedCounter++;
811                     }
812                   return YACS::NOEVENT;
813                 }
814           }
815           catch(YACS::Exception& ex)
816           {
817               DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
818               //no way to push results : put following nodes in FAILED state
819               //TODO could be more fine grain : put only concerned nodes in FAILED state
820               exForwardFailed();
821               setState(YACS::ERROR);
822               return YACS::ABORT;
823           }
824         }
825     }
826   else if(_state == YACS::ACTIVATED)
827     {//more elements to do and loop still activated
828       _execIds[id]=_execCurrentId;
829       node->init(false);
830       int posInAbs(_execCurrentId);
831       if(_passedData)
832         posInAbs=_passedData->toAbsId(_execCurrentId);
833       _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
834       _execCurrentId++;
835       node->exUpdateState();
836       forwardExecStateToOriginalBody(node);
837       _nbOfEltConsumed++;
838     }
839   else
840     {//elements to process and loop no more activated
841       DEBTRACE("foreach loop state " << _state);
842     }
843   return YACS::NOEVENT;
844 }
845
846 YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
847 {
848   DEBTRACE("Finalize node finished on branch " << id);
849   _unfinishedCounter--;
850   _currentIndex++;
851   exUpdateProgress();
852   DEBTRACE(_unfinishedCounter << " finalize nodes still running");
853   if (_unfinishedCounter == 0)
854     {
855       _finalizeNode->setState(YACS::DONE);
856       setState(YACS::DONE);
857       return YACS::FINISH;
858     }
859   else
860     return YACS::NOEVENT;
861 }
862
863 YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
864 {
865   unsigned int id;
866   DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
867   if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
868     return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
869   else
870     {
871       _failedCounter++;
872       return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
873     }
874 }
875
876 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
877 {
878   DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
879   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
880   if(typeOfPortInstance==OutputPort::NAME)
881     {
882       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
883       int i=0;
884       for(;iter!=_outGoingPorts.end();iter++,i++)
885         if((*iter)->getRepr()==port.first || *iter==port.first)
886           break;
887       if(iter!=_outGoingPorts.end())
888         {
889           if(*iter!=port.first)
890             {
891               (*iter)->incrRef();
892               (*iter)->addRepr(port.first,_intecptrsForOutGoingPorts[i]);
893             }
894           port.first=*iter;
895         }
896       else
897         {
898           TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
899           AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
900           InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
901           intercptor->setRepr(newPort);
902           newTc->decrRef();
903           newPort->addRepr(port.first,intercptor);
904           _outGoingPorts.push_back(newPort);
905           _intecptrsForOutGoingPorts.push_back(intercptor);
906           port.first=newPort;
907         }
908     }
909   else
910     throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
911 }
912
913 void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
914 {
915   string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
916   if(typeOfPortInstance==OutputPort::NAME)
917     {
918       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
919       for(;iter!=_outGoingPorts.end();iter++)
920         if((*iter)->getRepr()==port.first)
921           break;
922       if(iter==_outGoingPorts.end())
923         {
924           string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name; 
925           throw Exception(what);
926         }
927       else
928         port.first=(*iter);
929     }
930   else
931     throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
932 }
933
934 void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
935 {
936   string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
937   if(typeOfPortInstance==OutputPort::NAME)
938     {
939       vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
940       vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();
941       for(;iter!=_outGoingPorts.end();iter++,iter2++)
942         if((*iter)->getRepr()==portDwn)
943           break;
944       //ASSERT(portUp==*iter.second)
945       if((*iter)->decrRef())
946         {
947           AnySplitOutputPort *p=*iter;
948           _outGoingPorts.erase(iter);
949           delete p;
950           InterceptorInputPort *ip=*iter2;
951           _intecptrsForOutGoingPorts.erase(iter2);
952           delete ip;
953         }
954     }
955 }
956
957 OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
958 {
959   string portName, nodeName;
960   splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
961   Node *staticChild = getChildByName(nodeName);
962   return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
963   //that a link starting from _initNode goes out of scope of 'this'.
964 }
965
966 void ForEachLoop::cleanDynGraph()
967 {
968   DynParaLoop::cleanDynGraph();
969   for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
970     (*iter3)->decrRef();
971   _execVals.clear();
972   for(vector< vector<AnyInputPort *> >::iterator iter4=_execOutGoingPorts.begin();iter4!=_execOutGoingPorts.end();iter4++)
973     for(vector<AnyInputPort *>::iterator iter5=(*iter4).begin();iter5!=(*iter4).end();iter5++)
974       delete *iter5;
975   _execOutGoingPorts.clear();
976 }
977
978 void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
979 {
980   vector<AnyInputPort *>::iterator iter;
981   int i=0;
982   for(iter=_execOutGoingPorts[branchNb].begin();iter!=_execOutGoingPorts[branchNb].end();iter++,i++)
983     {
984       Any *val=(Any *)(*iter)->getValue();
985       _execVals[i]->setEltAtRank(rank,val);
986     }
987 }
988
989 int ForEachLoop::getFinishedId()
990 {
991   if(!_passedData)
992     return _splitterNode.getNumberOfElements();
993   else
994     return _passedData->getNumberOfElementsToDo();
995 }
996
997 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
998 {
999   _execVals.resize(_outGoingPorts.size());
1000   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1001   for(int i=0;iter!=_outGoingPorts.end();iter++,i++)
1002     _execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
1003 }
1004
1005 void ForEachLoop::pushAllSequenceValues()
1006 {
1007   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1008   int i=0;
1009   for(;iter!=_outGoingPorts.end();iter++,i++)
1010     (*iter)->put((const void *)_execVals[i]);
1011 }
1012
1013 void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
1014 {
1015   vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
1016   int i=0;
1017   for(;iter!=_outGoingPorts.end();iter++,i++)
1018     {
1019       DEBTRACE( (*iter)->getName() << " " << (*iter)->edGetType()->kind() );
1020       //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
1021       OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
1022       DEBTRACE( portOut->getName() );
1023       AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
1024       portOut->addInPort(interceptor);
1025       _execOutGoingPorts[branchNb].push_back(interceptor);
1026     }
1027 }
1028
1029 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
1030                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
1031 {
1032   if(isInMyDescendance(start->getNode())==_node)
1033     throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
1034 }
1035
1036 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
1037 {
1038   list<OutputPort *> ret;
1039   ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT)); 
1040   return ret;
1041 }
1042
1043 void ForEachLoop::accept(Visitor *visitor)
1044 {
1045   visitor->visitForEachLoop(this);
1046 }
1047
1048 //! Dump the node state to a stream
1049 /*!
1050  * \param os : the output stream
1051  */
1052 void ForEachLoop::writeDot(std::ostream &os) const
1053 {
1054   os << "  subgraph cluster_" << getId() << "  {\n" ;
1055   //only one node in a loop
1056   if(_node)
1057     {
1058       _node->writeDot(os);
1059       os << getId() << " -> " << _node->getId() << ";\n";
1060     }
1061   os << "}\n" ;
1062   os << getId() << "[fillcolor=\"" ;
1063   YACS::StatesForNode state=getEffectiveState();
1064   os << getColorState(state);
1065   os << "\" label=\"" << "Loop:" ;
1066   os << getName() <<"\"];\n";
1067 }
1068
1069 //! Reset the state of the node and its children depending on the parameter level
1070 void ForEachLoop::resetState(int level)
1071 {
1072   if(level==0)return;
1073   DynParaLoop::resetState(level);
1074   _execCurrentId=0;
1075   //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) 
1076   cleanDynGraph();
1077 }
1078
1079 std::string ForEachLoop::getProgress() const
1080 {
1081   int nbElems = _splitterNode.getNumberOfElements();
1082   std::stringstream aProgress;
1083   if (nbElems > 0)
1084     aProgress << _currentIndex << "/" << nbElems;
1085   else
1086     aProgress << "0";
1087   return aProgress.str();
1088 }
1089
1090 /*!
1091  * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
1092  * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
1093  * This method has one input \a execut and 3 outputs.
1094  *
1095  * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
1096  * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
1097  *                        All of the items in \a outputs have the same size.
1098  * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
1099  * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
1100  *         SequenceAny objects contained in \a outputs.
1101  *
1102  * \sa edGetSeqOfSamplesPort
1103  */
1104 std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
1105 {
1106   YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
1107   if(_execVals.empty())
1108     return std::vector<unsigned int>();
1109   if(_execOutGoingPorts.empty())
1110     return std::vector<unsigned int>();
1111   std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
1112   const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
1113   for(std::size_t i=0;i<sz;i++)
1114     {
1115       outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
1116       nameOfOutputs[i]=ports[i]->getName();
1117     }
1118   return _execVals[0]->getSetItems();
1119 }
1120
1121 /*!
1122  * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
1123  * getPassedResults method.
1124  */
1125 void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
1126 {
1127   delete _passedData;
1128   _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
1129 }
1130