Salome HOME
Manage initnode and finalizenode in the foreachloop progress bar.
[modules/yacs.git] / src / engine / ForEachLoop.cxx
index 431fee9f51eb8cc0b49d6bb6964a395135e8b9f4..3b9dec62cb8baa4e7fe63f4971cda31e067cb6ab 100644 (file)
@@ -25,7 +25,9 @@
 #include "AutoLocker.hxx"
 
 #include <iostream>
+#include <iomanip>
 #include <sstream>
+#include <algorithm>    // std::replace_if
 
 //#define _DEVDEBUG_
 #include "YacsTrace.hxx"
@@ -175,7 +177,7 @@ std::string SeqAnyInputPort::dump()
       switch (val->getType()->kind())
         {
         case Double:
-          xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
+          xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
           break;
         case Int:
           xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
@@ -294,15 +296,170 @@ void FakeNodeForForEachLoop::finished()
   _loop->setState(YACS::DONE);
 }
 
+ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
+{
+  std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
+  if(sz1!=sz2)
+    throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      const SequenceAny *elt(*it);
+      if(elt)
+        if(sz!=(std::size_t)elt->size())
+          throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
+    }
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      SequenceAny *elt(*it);
+      if(elt)
+        elt->incrRef();
+    }
+}
+
+ForEachLoopPassedData::~ForEachLoopPassedData()
+{
+  for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+    {
+      SequenceAny *elt(*it);
+      if(elt)
+        elt->decrRef();
+    }
+}
+
+void ForEachLoopPassedData::init()
+{
+  _flagsIds.clear();
+}
+
+void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
+{
+  if(nbOfElts<0)
+    throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
+  std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
+  if(nbOfElts2<sizeExp)
+    throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
+  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+    {
+      if((*it)>=nbOfElts2)
+        throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
+    }
+  _flagsIds.resize(nbOfElts);
+  std::fill(_flagsIds.begin(),_flagsIds.end(),false);
+  for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+    {
+      if(*it<nbOfElts)
+        {
+          if(!_flagsIds[*it])
+            _flagsIds[*it]=true;
+          else
+            {
+              std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
+              throw YACS::Exception(oss.str());
+            }
+        }
+      else
+        {
+          std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," <<  nbOfElts << ") !";
+          throw YACS::Exception(oss.str());
+        }
+    }
+}
+
+void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
+{
+  std::size_t sz(_nameOfOutputs.size());
+  if(sz!=ports.size())
+    throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
+  for(std::size_t i=0;i<sz;i++)
+    {
+      AnyInputPort *elt(ports[i]);
+      if(!elt)
+        throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
+      if(_nameOfOutputs[i]!=elt->getName())
+        {
+          std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
+          throw YACS::Exception(oss.str());
+        }
+    }
+}
+
+/*!
+ * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
+ */
+int ForEachLoopPassedData::toAbsId(int localId) const
+{
+  if(localId<0)
+    throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
+  int ret(0),curLocId(0);
+  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+    {
+      if(!*it)
+        {
+          if(localId==curLocId)
+            return ret;
+          curLocId++;
+        }
+    }
+  throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
+}
+
+/*!
+ * Equivalent to toAbsId except that only ON are considered here.
+ */
+int ForEachLoopPassedData::toAbsIdNot(int localId) const
+{
+  if(localId<0)
+    throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
+  int ret(0),curLocId(0);
+  for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+    {
+      if(*it)//<- diff is here !
+        {
+          if(localId==curLocId)
+            return ret;
+          curLocId++;
+        }
+    }
+  throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
+}
+
+int ForEachLoopPassedData::getNumberOfElementsToDo() const
+{
+  std::size_t nbAllElts(_flagsIds.size());
+  std::size_t ret(nbAllElts-_passedIds.size());
+  return ret;
+}
+
+void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
+{
+  std::size_t sz(execVals.size());
+  if(_passedOutputs.size()!=sz)
+    throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
+  for(std::size_t i=0;i<sz;i++)
+    {
+      SequenceAny *elt(_passedOutputs[i]);
+      SequenceAny *eltDestination(execVals[i]);
+      if(!elt)
+        throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
+      unsigned int szOfElt(elt->size());
+      for(unsigned int j=0;j<szOfElt;j++)
+        {
+          AnyPtr elt1((*elt)[j]);
+          int jAbs(toAbsIdNot(j));
+          eltDestination->setEltAtRank(jAbs,elt1);
+        }
+    }
+}
+
 ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
                                                                                 _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
-                                                                                _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+                                                                                _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
 {
 }
 
 ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
                                                                                            _splitterNode(other._splitterNode,this),
-                                                                                           _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+                                                                                           _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
 {
   int i=0;
   if(!editionOnly)
@@ -310,7 +467,7 @@ ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool ed
       {
         AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
         InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
-        temp->addRepr(getOutPort((*iter2)->getName()),interc);
+        temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
         interc->setRepr(temp);
         _outGoingPorts.push_back(temp);
         _intecptrsForOutGoingPorts.push_back(interc);
@@ -329,6 +486,7 @@ ForEachLoop::~ForEachLoop()
     delete *iter;
   for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
     delete *iter2;
+  delete _passedData;
 }
 
 void ForEachLoop::init(bool start)
@@ -339,6 +497,8 @@ void ForEachLoop::init(bool start)
   cleanDynGraph();
   _currentIndex = 0;
   exUpdateProgress();
+  if(_passedData)
+    _passedData->init();
 }
 
 void ForEachLoop::exUpdateState()
@@ -346,17 +506,24 @@ void ForEachLoop::exUpdateState()
   DEBTRACE("ForEachLoop::exUpdateState");
   if(_state == YACS::DISABLED)
     return;
+  if(_state == YACS::DONE)
+    return;
   if(_inGate.exIsReady())
     {
       //internal graph update
       int i;
-      int nbOfBr=_nbOfBranches.getIntValue();
-      int nbOfElts=_splitterNode.getNumberOfElements();
+      int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
+      if(_passedData)
+        {
+          _passedData->checkCompatibilyWithNb(nbOfElts);
+          nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
+        }
+      int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
 
       DEBTRACE("nbOfElts=" << nbOfElts);
       DEBTRACE("nbOfBr=" << nbOfBr);
 
-      if(nbOfElts==0)
+      if(nbOfEltsToDo==0)
         {
           prepareSequenceValues(0);
           delete _nodeForSpecialCases;
@@ -371,8 +538,8 @@ void ForEachLoop::exUpdateState()
           setState(YACS::ACTIVATED);
           return ;
         }
-      if(nbOfBr>nbOfElts)
-        nbOfBr=nbOfElts;
+      if(nbOfBr>nbOfEltsToDo)
+        nbOfBr=nbOfEltsToDo;
       _execNodes.resize(nbOfBr);
       _execIds.resize(nbOfBr);
       _execOutGoingPorts.resize(nbOfBr);
@@ -394,8 +561,6 @@ void ForEachLoop::exUpdateState()
         {
           for(i=0;i<nbOfBr;i++)
             {
-              DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
-              _execIds[i]=_execCurrentId;
               DEBTRACE( "-------------- 2" );
               vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
               if(_initNode)
@@ -408,9 +573,23 @@ void ForEachLoop::exUpdateState()
               DEBTRACE( "-------------- 5" );
               createOutputOutOfScopeInterceptors(i);
               DEBTRACE( "-------------- 6" );
-              _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
+            }
+          for(i=0;i<nbOfBr;i++)
+            {
+              DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
+              _execIds[i]=_execCurrentId;
+              int posInAbs(_execCurrentId);
+              if(_passedData)
+                posInAbs=_passedData->toAbsId(_execCurrentId);
+              _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
+              _execCurrentId++;
               DEBTRACE( "-------------- 7" );
-            } 
+            }
+          if(_passedData)
+            {
+              _passedData->checkLevel2(_execOutGoingPorts[0]);
+              _passedData->assignAlreadyDone(_execVals);
+            }
         }
       catch(YACS::Exception& ex)
         {
@@ -562,6 +741,7 @@ YACS::Event ForEachLoop::updateStateForInitNodeOnFinishedEventFrom(Node *node, u
   _execNodes[id]->exUpdateState();
   _nbOfEltConsumed++;
   _initializingCounter--;
+  _currentIndex++;
   if (_initializingCounter == 0)
     _initNode->setState(DONE);
   return YACS::NOEVENT;
@@ -575,8 +755,14 @@ YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, u
   _currentIndex++;
   exUpdateProgress();
   if(isNormalFinish)
-    storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
-  if(_execCurrentId==_splitterNode.getNumberOfElements())
+    {
+      int globalId(_execIds[id]);
+      if(_passedData)
+        globalId=_passedData->toAbsId(globalId);
+      storeOutValsInSeqForOutOfScopeUse(globalId,id);
+    }
+  //
+  if(_execCurrentId==getFinishedId())
     {//No more elements of _dataPortToDispatch to treat
       _execIds[id]=NOT_RUNNING_BRANCH_ID;
       //analyzing if some samples are still on treatment on other branches.
@@ -588,9 +774,11 @@ YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, u
           try
           {
               if(_failedCounter!=0)
-                {
+                {// case of keepgoing mode + a failed
                   std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
-                  throw YACS::Exception(oss.str());
+                  DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
+                  setState(YACS::FAILED);
+                  return YACS::ABORT;
                 }
               pushAllSequenceValues();
 
@@ -643,7 +831,11 @@ YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, u
     {//more elements to do and loop still activated
       _execIds[id]=_execCurrentId;
       node->init(false);
-      _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
+      int posInAbs(_execCurrentId);
+      if(_passedData)
+        posInAbs=_passedData->toAbsId(_execCurrentId);
+      _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
+      _execCurrentId++;
       node->exUpdateState();
       forwardExecStateToOriginalBody(node);
       _nbOfEltConsumed++;
@@ -708,8 +900,16 @@ void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort
       else
         {
           TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
-          AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
-          InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
+          // The out going ports belong to the ForEachLoop, whereas
+          // the delegated port belongs to a node child of the ForEachLoop.
+          // The name of the delegated port contains dots (bloc.node.outport),
+          // whereas the name of the out going port shouldn't do.
+          std::string outputPortName = getPortName(port.first);
+          std::replace_if (outputPortName.begin(), outputPortName.end(),
+                           std::bind1st(std::equal_to<char>(), '.'), '_');
+          outputPortName += "_interceptor";
+          AnySplitOutputPort *newPort=new AnySplitOutputPort(outputPortName,this,newTc);
+          InterceptorInputPort *intercptor=new InterceptorInputPort(outputPortName + "_in",this,port.first->edGetType());
           intercptor->setRepr(newPort);
           newTc->decrRef();
           newPort->addRepr(port.first,intercptor);
@@ -798,6 +998,14 @@ void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
     }
 }
 
+int ForEachLoop::getFinishedId()
+{
+  if(!_passedData)
+    return _splitterNode.getNumberOfElements();
+  else
+    return _passedData->getNumberOfElementsToDo();
+}
+
 void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
 {
   _execVals.resize(_outGoingPorts.size());
@@ -833,8 +1041,13 @@ void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
 void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
 {
-  if(isInMyDescendance(start->getNode())==_node)
-    throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
+  DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
+  if(end->getNode() == &_splitterNode)
+    throw Exception("Illegal link within a foreach loop: \
+the 'SmplsCollection' port cannot be linked within the scope of the loop.");
+  if(end == &_nbOfBranches)
+    throw Exception("Illegal link within a foreach loop: \
+the 'nbBranches' port cannot be linked within the scope of the loop.");
 }
 
 std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
@@ -882,7 +1095,7 @@ void ForEachLoop::resetState(int level)
 
 std::string ForEachLoop::getProgress() const
 {
-  int nbElems = _splitterNode.getNumberOfElements();
+  int nbElems(getNbOfElementsToBeProcessed());
   std::stringstream aProgress;
   if (nbElems > 0)
     aProgress << _currentIndex << "/" << nbElems;
@@ -891,6 +1104,38 @@ std::string ForEachLoop::getProgress() const
   return aProgress.str();
 }
 
+//! Get the progress weight for all elementary nodes
+/*!
+ * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
+ * by the number of elements done and the weight total by the number total of elements
+ */
+list<ProgressWeight> ForEachLoop::getProgressWeight() const
+{
+  list<ProgressWeight> ret;
+  list<Node *> setOfNode=edGetDirectDescendants();
+  int elemDone=getCurrentIndex();
+  int elemTotal=getNbOfElementsToBeProcessed();
+  for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
+    {
+      list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
+      for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
+        {
+          (*iter).weightDone=((*iter).weightTotal) * elemDone;
+          (*iter).weightTotal*=elemTotal;
+        }
+      ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
+    }
+  return ret;
+}
+
+int ForEachLoop::getNbOfElementsToBeProcessed() const
+{
+  int nbBranches = _nbOfBranches.getIntValue();
+  return _splitterNode.getNumberOfElements()
+         + (_initNode ? nbBranches:0)
+         + (_finalizeNode ? nbBranches:0) ;
+}
+
 /*!
  * This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
  * The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
@@ -922,3 +1167,14 @@ std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::v
   return _execVals[0]->getSetItems();
 }
 
+/*!
+ * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
+ * getPassedResults method.
+ */
+void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
+{
+  delete _passedData;
+  _failedCounter=0;
+  _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
+}
+