Salome HOME
Merge branch 'V8_5_BR'
[modules/yacs.git] / src / engine / ForEachLoop.cxx
old mode 100755 (executable)
new mode 100644 (file)
index 1dc70d9..838bfb3
@@ -1,4 +1,4 @@
-// Copyright (C) 2006-2014  CEA/DEN, EDF R&D
+// Copyright (C) 2006-2016  CEA/DEN, EDF R&D
 //
 // This library is free software; you can redistribute it and/or
 // modify it under the terms of the GNU Lesser General Public
 #include "TypeCode.hxx"
 #include "Visitor.hxx"
 #include "ComposedNode.hxx"
+#include "Executor.hxx"
+#include "AutoLocker.hxx"
+
 #include <iostream>
+#include <iomanip>
 #include <sstream>
+#include <algorithm>    // std::replace_if
 
 //#define _DEVDEBUG_
 #include "YacsTrace.hxx"
@@ -44,6 +49,8 @@ const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
 
 const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
 
+const char ForEachLoop::INTERCEPTOR_STR[]="_interceptor";
+
 InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
                                                                                                 DataPort(name,node,type),Port(node),
                                                                                                 _repr(0)
@@ -172,7 +179,7 @@ std::string SeqAnyInputPort::dump()
       switch (val->getType()->kind())
         {
         case Double:
-          xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
+          xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
           break;
         case Int:
           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
@@ -291,15 +298,178 @@ void FakeNodeForForEachLoop::finished()
   _loop->setState(YACS::DONE);
 }
 
+ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
+{
+  std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
+  if(sz1!=sz2)
+    throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      const SequenceAny *elt(*it);
+      if(elt)
+        if(sz!=(std::size_t)elt->size())
+          throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
+    }
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      SequenceAny *elt(*it);
+      if(elt)
+        elt->incrRef();
+    }
+}
+
+ForEachLoopPassedData::ForEachLoopPassedData(const ForEachLoopPassedData& copy)
+: _passedIds(copy._passedIds),
+  _passedOutputs(copy._passedOutputs),
+  _nameOfOutputs(copy._nameOfOutputs),
+  _flagsIds(copy._flagsIds)
+{
+}
+
+ForEachLoopPassedData::~ForEachLoopPassedData()
+{
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      SequenceAny *elt(*it);
+      if(elt)
+        elt->decrRef();
+    }
+}
+
+void ForEachLoopPassedData::init()
+{
+  _flagsIds.clear();
+}
+
+void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
+{
+  if(nbOfElts<0)
+    throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
+  std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
+  if(nbOfElts2<sizeExp)
+    throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
+  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+    {
+      if((*it)>=nbOfElts2)
+        throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
+    }
+  _flagsIds.resize(nbOfElts);
+  std::fill(_flagsIds.begin(),_flagsIds.end(),false);
+  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+    {
+      if(*it<nbOfElts)
+        {
+          if(!_flagsIds[*it])
+            _flagsIds[*it]=true;
+          else
+            {
+              std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
+              throw YACS::Exception(oss.str());
+            }
+        }
+      else
+        {
+          std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," <<  nbOfElts << ") !";
+          throw YACS::Exception(oss.str());
+        }
+    }
+}
+
+void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
+{
+  std::size_t sz(_nameOfOutputs.size());
+  if(sz!=ports.size())
+    throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
+  for(std::size_t i=0;i<sz;i++)
+    {
+      AnyInputPort *elt(ports[i]);
+      if(!elt)
+        throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
+      if(_nameOfOutputs[i]!=elt->getName())
+        {
+          std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
+          throw YACS::Exception(oss.str());
+        }
+    }
+}
+
+/*!
+ * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
+ */
+int ForEachLoopPassedData::toAbsId(int localId) const
+{
+  if(localId<0)
+    throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
+  int ret(0),curLocId(0);
+  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+    {
+      if(!*it)
+        {
+          if(localId==curLocId)
+            return ret;
+          curLocId++;
+        }
+    }
+  throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
+}
+
+/*!
+ * Equivalent to toAbsId except that only ON are considered here.
+ */
+int ForEachLoopPassedData::toAbsIdNot(int localId) const
+{
+  if(localId<0)
+    throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
+  int ret(0),curLocId(0);
+  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+    {
+      if(*it)//<- diff is here !
+        {
+          if(localId==curLocId)
+            return ret;
+          curLocId++;
+        }
+    }
+  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
+}
+
+int ForEachLoopPassedData::getNumberOfElementsToDo() const
+{
+  std::size_t nbAllElts(_flagsIds.size());
+  std::size_t ret(nbAllElts-_passedIds.size());
+  return ret;
+}
+
+void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
+{
+  std::size_t sz(execVals.size());
+  if(_passedOutputs.size()!=sz)
+    throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
+  for(std::size_t i=0;i<sz;i++)
+    {
+      SequenceAny *elt(_passedOutputs[i]);
+      SequenceAny *eltDestination(execVals[i]);
+      if(!elt)
+        throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
+      unsigned int szOfElt(elt->size());
+      for(unsigned int j=0;j<szOfElt;j++)
+        {
+          AnyPtr elt1((*elt)[j]);
+          int jAbs(toAbsIdNot(j));
+          eltDestination->setEltAtRank(jAbs,elt1);
+        }
+    }
+}
+
 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
-                                                                                _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+                                                                                _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
 {
 }
 
 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
                                                                                            _splitterNode(other._splitterNode,this),
-                                                                                           _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+                                                                                           _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
 {
   int i=0;
   if(!editionOnly)
@@ -307,7 +477,7 @@ ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool ed
       {
         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
-        temp->addRepr(getOutPort((*iter2)->getName()),interc);
+        temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
         interc->setRepr(temp);
         _outGoingPorts.push_back(temp);
         _intecptrsForOutGoingPorts.push_back(interc);
@@ -326,6 +496,7 @@ ForEachLoop::~ForEachLoop()
     delete *iter;
   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
     delete *iter2;
+  delete _passedData;
 }
 
 void ForEachLoop::init(bool start)
@@ -336,6 +507,8 @@ void ForEachLoop::init(bool start)
   cleanDynGraph();
   _currentIndex = 0;
   exUpdateProgress();
+  if(_passedData)
+    _passedData->init();
 }
 
 void ForEachLoop::exUpdateState()
@@ -343,17 +516,24 @@ void ForEachLoop::exUpdateState()
   DEBTRACE("ForEachLoop::exUpdateState");
   if(_state == YACS::DISABLED)
     return;
+  if(_state == YACS::DONE)
+    return;
   if(_inGate.exIsReady())
     {
       //internal graph update
       int i;
-      int nbOfBr=_nbOfBranches.getIntValue();
-      int nbOfElts=_splitterNode.getNumberOfElements();
+      int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
+      if(_passedData)
+        {
+          _passedData->checkCompatibilyWithNb(nbOfElts);
+          nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
+        }
+      int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
 
       DEBTRACE("nbOfElts=" << nbOfElts);
       DEBTRACE("nbOfBr=" << nbOfBr);
 
-      if(nbOfElts==0)
+      if(nbOfEltsToDo==0)
         {
           prepareSequenceValues(0);
           delete _nodeForSpecialCases;
@@ -368,8 +548,8 @@ void ForEachLoop::exUpdateState()
           setState(YACS::ACTIVATED);
           return ;
         }
-      if(nbOfBr>nbOfElts)
-        nbOfBr=nbOfElts;
+      if(nbOfBr>nbOfEltsToDo)
+        nbOfBr=nbOfEltsToDo;
       _execNodes.resize(nbOfBr);
       _execIds.resize(nbOfBr);
       _execOutGoingPorts.resize(nbOfBr);
@@ -391,8 +571,6 @@ void ForEachLoop::exUpdateState()
         {
           for(i=0;i<nbOfBr;i++)
             {
-              DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
-              _execIds[i]=_execCurrentId;
               DEBTRACE( "-------------- 2" );
               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
               if(_initNode)
@@ -405,9 +583,23 @@ void ForEachLoop::exUpdateState()
               DEBTRACE( "-------------- 5" );
               createOutputOutOfScopeInterceptors(i);
               DEBTRACE( "-------------- 6" );
-              _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
+            }
+          for(i=0;i<nbOfBr;i++)
+            {
+              DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
+              _execIds[i]=_execCurrentId;
+              int posInAbs(_execCurrentId);
+              if(_passedData)
+                posInAbs=_passedData->toAbsId(_execCurrentId);
+              _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
+              _execCurrentId++;
               DEBTRACE( "-------------- 7" );
-            } 
+            }
+          if(_passedData)
+            {
+              _passedData->checkLevel2(_execOutGoingPorts[0]);
+              _passedData->assignAlreadyDone(_execVals);
+            }
         }
       catch(YACS::Exception& ex)
         {
@@ -543,110 +735,173 @@ YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
   switch(getIdentityOfNotifyerNode(node,id))
     {
     case INIT_NODE:
-      _execNodes[id]->exUpdateState();
-      _nbOfEltConsumed++;
-      _initializingCounter--;
-      if (_initializingCounter == 0) _initNode->setState(DONE);
-      break;
+      return updateStateForInitNodeOnFinishedEventFrom(node,id);
     case WORK_NODE:
-      _currentIndex++;
-      exUpdateProgress();
-      storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
-      if(_execCurrentId==_splitterNode.getNumberOfElements())
-        {//No more elements of _dataPortToDispatch to treat
-          _execIds[id]=NOT_RUNNING_BRANCH_ID;
-          //analyzing if some samples are still on treatment on other branches.
-          bool isFinished=true;
-          for(int i=0;i<_execIds.size() && isFinished;i++)
-            isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
-          if(isFinished)
-            {
-              try 
+      return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
+    case FINALIZE_NODE:
+      return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
+    default:
+      YASSERT(false);
+    }
+  return YACS::NOEVENT;
+}
+
+YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+  _execNodes[id]->exUpdateState();
+  _nbOfEltConsumed++;
+  _initializingCounter--;
+  _currentIndex++;
+  if (_initializingCounter == 0)
+    _initNode->setState(DONE);
+  return YACS::NOEVENT;
+}
+
+/*!
+ * \param [in] isNormalFinish - if true
+ */
+YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
+{
+  _currentIndex++;
+  exUpdateProgress();
+  if(isNormalFinish)
+    {
+      int globalId(_execIds[id]);
+      if(_passedData)
+        globalId=_passedData->toAbsId(globalId);
+      sendEvent2("progress_ok",&globalId);
+      storeOutValsInSeqForOutOfScopeUse(globalId,id);
+    }
+  else
+    {
+      int globalId(_execIds[id]);
+      if(_passedData)
+        globalId=_passedData->toAbsId(globalId);
+      sendEvent2("progress_ko",&globalId);
+    }
+  //
+  if(_execCurrentId==getFinishedId())
+    {//No more elements of _dataPortToDispatch to treat
+      _execIds[id]=NOT_RUNNING_BRANCH_ID;
+      //analyzing if some samples are still on treatment on other branches.
+      bool isFinished(true);
+      for(int i=0;i<_execIds.size() && isFinished;i++)
+        isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
+      if(isFinished)
+        {
+          try
+          {
+              if(_failedCounter!=0)
+                {// case of keepgoing mode + a failed
+                  std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
+                  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
+                  setState(YACS::FAILED);
+                  return YACS::ABORT;
+                }
+              pushAllSequenceValues();
+
+              if (_node)
                 {
-                  pushAllSequenceValues();
-  
-                  if (_node)
-                    {
-                      _node->setState(YACS::DONE);
-     
-                      ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
-                      if (compNode)
-                        {
-                          std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
-                          std::list<Node *>::iterator iter=aChldn.begin();
-                          for(;iter!=aChldn.end();iter++)
-                            (*iter)->setState(YACS::DONE);
-                        }
-                    }
+                  _node->setState(YACS::DONE);
 
-                  if (_finalizeNode == NULL)
-                    {
-                      // No finalize node, we just finish the loop at the end of exec nodes execution
-                      setState(YACS::DONE);
-                      return YACS::FINISH;
-                    }
-                  else
+                  ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
+                  if (compNode)
                     {
-                      // Run the finalize nodes, the loop will be done only when they all finish
-                      _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
-                      for (int i=0 ; i<_execIds.size() ; i++)
-                        {
-                          YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
-                          DEBTRACE("Launching finalize node for branch " << i);
-                          _execFinalizeNodes[i]->exUpdateState();
-                          _unfinishedCounter++;
-                        }
-                      return YACS::NOEVENT;
+                      std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
+                      std::list<Node *>::iterator iter=aChldn.begin();
+                      for(;iter!=aChldn.end();iter++)
+                        (*iter)->setState(YACS::DONE);
                     }
                 }
-              catch(YACS::Exception& ex)
+
+              if (_finalizeNode == NULL)
                 {
-                  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
-                  //no way to push results : put following nodes in FAILED state
-                  //TODO could be more fine grain : put only concerned nodes in FAILED state
-                  exForwardFailed();
-                  setState(YACS::ERROR);
-                  return YACS::ABORT;
+                  // No finalize node, we just finish the loop at the end of exec nodes execution
+                  setState(YACS::DONE);
+                  return YACS::FINISH;
                 }
-            }
-        }
-      else if(_state == YACS::ACTIVATED)
-        {//more elements to do and loop still activated
-          _execIds[id]=_execCurrentId;
-          node->init(false);
-          _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
-          node->exUpdateState();
-          forwardExecStateToOriginalBody(node);
-          _nbOfEltConsumed++;
-        }
-      else
-        {//elements to process and loop no more activated
-          DEBTRACE("foreach loop state " << _state);
-        }
-      break;
-    case FINALIZE_NODE:
-    {
-      DEBTRACE("Finalize node finished on branch " << id);
-      _unfinishedCounter--;
-      _currentIndex++;
-      exUpdateProgress();
-      DEBTRACE(_unfinishedCounter << " finalize nodes still running");
-      if (_unfinishedCounter == 0)
-        {
-          _finalizeNode->setState(YACS::DONE);
-          setState(YACS::DONE);
-          return YACS::FINISH;
+              else
+                {
+                  // Run the finalize nodes, the loop will be done only when they all finish
+                  _unfinishedCounter = 0;  // This counter indicates how many branches are not finished
+                  for (int i=0 ; i<_execIds.size() ; i++)
+                    {
+                      YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
+                      DEBTRACE("Launching finalize node for branch " << i);
+                      _execFinalizeNodes[i]->exUpdateState();
+                      _unfinishedCounter++;
+                    }
+                  return YACS::NOEVENT;
+                }
+          }
+          catch(YACS::Exception& ex)
+          {
+              DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
+              //no way to push results : put following nodes in FAILED state
+              //TODO could be more fine grain : put only concerned nodes in FAILED state
+              exForwardFailed();
+              setState(YACS::ERROR);
+              return YACS::ABORT;
+          }
         }
-      else
-        return YACS::NOEVENT;
-      break;
     }
-    default:
-      YASSERT(false);
+  else if(_state == YACS::ACTIVATED)
+    {//more elements to do and loop still activated
+      _execIds[id]=_execCurrentId;
+      node->init(false);
+      int posInAbs(_execCurrentId);
+      if(_passedData)
+        posInAbs=_passedData->toAbsId(_execCurrentId);
+      _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
+      _execCurrentId++;
+      node->exUpdateState();
+      forwardExecStateToOriginalBody(node);
+      _nbOfEltConsumed++;
+    }
+  else
+    {//elements to process and loop no more activated
+      DEBTRACE("foreach loop state " << _state);
     }
   return YACS::NOEVENT;
 }
 
+YACS::Event ForEachLoop::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+  DEBTRACE("Finalize node finished on branch " << id);
+  _unfinishedCounter--;
+  _currentIndex++;
+  exUpdateProgress();
+  DEBTRACE(_unfinishedCounter << " finalize nodes still running");
+  if (_unfinishedCounter == 0)
+    {
+      _finalizeNode->setState(YACS::DONE);
+      setState(YACS::DONE);
+      return YACS::FINISH;
+    }
+  else
+    return YACS::NOEVENT;
+}
+
+YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
+{
+  unsigned int id;
+  DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
+  // TODO: deal with keepgoing without the dependency to Executor
+  if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
+    return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
+  else
+    {
+      _failedCounter++;
+      return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
+    }
+}
+
+void ForEachLoop::InterceptorizeNameOfPort(std::string& portName)
+{
+  std::replace_if(portName.begin(), portName.end(), std::bind1st(std::equal_to<char>(), '.'), '_');
+  portName += INTERCEPTOR_STR;
+}
+
 void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
 {
   DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
@@ -669,9 +924,16 @@ void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort
         }
       else
         {
-          TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
-          AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
-          InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
+          TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
+          TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
+          // The out going ports belong to the ForEachLoop, whereas
+          // the delegated port belongs to a node child of the ForEachLoop.
+          // The name of the delegated port contains dots (bloc.node.outport),
+          // whereas the name of the out going port shouldn't do.
+          std::string outputPortName(getPortName(port.first));
+          InterceptorizeNameOfPort(outputPortName);
+          AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
+          InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
           intercptor->setRepr(newPort);
           newTc->decrRef();
           newPort->addRepr(port.first,intercptor);
@@ -760,6 +1022,14 @@ void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
     }
 }
 
+int ForEachLoop::getFinishedId()
+{
+  if(!_passedData)
+    return _splitterNode.getNumberOfElements();
+  else
+    return _passedData->getNumberOfElementsToDo();
+}
+
 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
 {
   _execVals.resize(_outGoingPorts.size());
@@ -786,7 +1056,8 @@ void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
       //AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
       OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
       DEBTRACE( portOut->getName() );
-      AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
+      TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
+      AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
       portOut->addInPort(interceptor);
       _execOutGoingPorts[branchNb].push_back(interceptor);
     }
@@ -795,8 +1066,13 @@ void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
 {
-  if(isInMyDescendance(start->getNode())==_node)
-    throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
+  DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
+  if(end->getNode() == &_splitterNode)
+    throw Exception("Illegal link within a foreach loop: \
+the 'SmplsCollection' port cannot be linked within the scope of the loop.");
+  if(end == &_nbOfBranches)
+    throw Exception("Illegal link within a foreach loop: \
+the 'nbBranches' port cannot be linked within the scope of the loop.");
 }
 
 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
@@ -844,7 +1120,7 @@ void ForEachLoop::resetState(int level)
 
 std::string ForEachLoop::getProgress() const
 {
-  int nbElems = _splitterNode.getNumberOfElements();
+  int nbElems(getNbOfElementsToBeProcessed());
   std::stringstream aProgress;
   if (nbElems > 0)
     aProgress << _currentIndex << "/" << nbElems;
@@ -852,3 +1128,146 @@ std::string ForEachLoop::getProgress() const
     aProgress << "0";
   return aProgress.str();
 }
+
+//! Get the progress weight for all elementary nodes
+/*!
+ * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
+ * by the number of elements done and the weight total by the number total of elements
+ */
+list<ProgressWeight> ForEachLoop::getProgressWeight() const
+{
+  list<ProgressWeight> ret;
+  list<Node *> setOfNode=edGetDirectDescendants();
+  int elemDone=getCurrentIndex();
+  int elemTotal=getNbOfElementsToBeProcessed();
+  for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
+    {
+      list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
+      for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
+        {
+          (*iter).weightDone=((*iter).weightTotal) * elemDone;
+          (*iter).weightTotal*=elemTotal;
+        }
+      ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
+    }
+  return ret;
+}
+
+int ForEachLoop::getNbOfElementsToBeProcessed() const
+{
+  int nbBranches = _nbOfBranches.getIntValue();
+  return _splitterNode.getNumberOfElements()
+         + (_initNode ? nbBranches:0)
+         + (_finalizeNode ? nbBranches:0) ;
+}
+
+/*!
+ * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
+ * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
+ * This method has one input \a execut and 3 outputs.
+ *
+ * \param [in] execut - The single input is for threadsafety reasons because this method can be called safely during the execution of \a this.
+ * \param [out] outputs - For each output ports in \a this linked with nodes sharing the same father than \a this the passed results are stored.
+ *                        All of the items in \a outputs have the same size.
+ * \param [out] nameOfOutputs - The array with same size than \a outputs, that tells for each item in outputs the output port it refers to.
+ * \return the list of ids among \c this->edGetSeqOfSamplesPort() that run successfully. The length of this returned array will be the length of all
+ *         SequenceAny objects contained in \a outputs.
+ *
+ * \sa edGetSeqOfSamplesPort
+ */
+std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
+  if(_execVals.empty())
+    return std::vector<unsigned int>();
+  if(_execOutGoingPorts.empty())
+    return std::vector<unsigned int>();
+  std::size_t sz(_execVals.size());
+  outputs.resize(sz);
+  nameOfOutputs.resize(sz);
+  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
+  for(std::size_t i=0;i<sz;i++)
+    {
+      outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
+      nameOfOutputs[i]=ports[i]->getName();
+    }
+  return _execVals[0]->getSetItems();
+}
+
+/*!
+ * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
+ * getPassedResults method.
+ */
+void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
+{
+  delete _passedData;
+  _failedCounter=0;
+  _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
+}
+
+int ForEachLoop::getFEDeltaBetween(OutPort *start, InPort *end)
+{
+  Node *ns(start->getNode()),*ne(end->getNode());
+  ComposedNode *co(getLowestCommonAncestor(ns,ne));
+  int ret(0);
+  Node *work(ns);
+  while(work!=co)
+    {
+      ForEachLoop *isFE(dynamic_cast<ForEachLoop *>(work));
+      if(isFE)
+        ret++;
+      work=work->getFather();
+    }
+  if(dynamic_cast<AnySplitOutputPort *>(start))
+    ret--;
+  return ret;
+}
+
+/*!
+ * This method is used to obtain the values already processed by the ForEachLoop.
+ * A new ForEachLoopPassedData object is returned. You have to delete it.
+ */
+ForEachLoopPassedData* ForEachLoop::getProcessedData()const
+{
+  std::vector<SequenceAny *> outputs;
+  std::vector<std::string> nameOfOutputs;
+  if(_execVals.empty() || _execOutGoingPorts.empty())
+    return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
+  std::size_t sz(_execVals.size());
+  outputs.resize(sz);
+  nameOfOutputs.resize(sz);
+  const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
+  for(std::size_t i=0;i<sz;i++)
+    {
+      outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
+      nameOfOutputs[i]=ports[i]->getName();
+    }
+  return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
+}
+
+void ForEachLoop::setProcessedData(ForEachLoopPassedData* processedData)
+{
+  if(_passedData)
+    delete _passedData;
+  _passedData = processedData;
+}
+
+/*!
+ * \param portName : "interceptorized" name of port.
+ */
+const YACS::ENGINE::TypeCode* ForEachLoop::getOutputPortType(const std::string& portName)const
+{
+  const YACS::ENGINE::TypeCode* ret=NULL;
+  vector<AnySplitOutputPort *>::const_iterator it;
+  for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
+  {
+    std::string originalPortName(getPortName(*it));
+    //InterceptorizeNameOfPort(originalPortName);
+    DEBTRACE("ForEachLoop::getOutputPortType compare " << portName << " == " << originalPortName);
+    if(originalPortName == portName)
+    {
+      ret = (*it)->edGetType()->contentType();
+    }
+  }
+  return ret;
+}