#include "AutoLocker.hxx"
#include <iostream>
+#include <iomanip>
#include <sstream>
+#include <algorithm> // std::replace_if
//#define _DEVDEBUG_
#include "YacsTrace.hxx"
switch (val->getType()->kind())
{
case Double:
- xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
+ xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
break;
case Int:
xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
_loop->setState(YACS::DONE);
}
+ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
+{
+ std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
+ if(sz1!=sz2)
+ throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ const SequenceAny *elt(*it);
+ if(elt)
+ if(sz!=(std::size_t)elt->size())
+ throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
+ }
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ SequenceAny *elt(*it);
+ if(elt)
+ elt->incrRef();
+ }
+}
+
+ForEachLoopPassedData::~ForEachLoopPassedData()
+{
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ SequenceAny *elt(*it);
+ if(elt)
+ elt->decrRef();
+ }
+}
+
+void ForEachLoopPassedData::init()
+{
+ _flagsIds.clear();
+}
+
+void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
+{
+ if(nbOfElts<0)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
+ std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
+ if(nbOfElts2<sizeExp)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
+ for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+ {
+ if((*it)>=nbOfElts2)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
+ }
+ _flagsIds.resize(nbOfElts);
+ std::fill(_flagsIds.begin(),_flagsIds.end(),false);
+ for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+ {
+ if(*it<nbOfElts)
+ {
+ if(!_flagsIds[*it])
+ _flagsIds[*it]=true;
+ else
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+ else
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+}
+
+void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
+{
+ std::size_t sz(_nameOfOutputs.size());
+ if(sz!=ports.size())
+ throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
+ for(std::size_t i=0;i<sz;i++)
+ {
+ AnyInputPort *elt(ports[i]);
+ if(!elt)
+ throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
+ if(_nameOfOutputs[i]!=elt->getName())
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+}
+
+/*!
+ * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
+ */
+int ForEachLoopPassedData::toAbsId(int localId) const
+{
+ if(localId<0)
+ throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
+ int ret(0),curLocId(0);
+ for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+ {
+ if(!*it)
+ {
+ if(localId==curLocId)
+ return ret;
+ curLocId++;
+ }
+ }
+ throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
+}
+
+/*!
+ * Equivalent to toAbsId except that only ON are considered here.
+ */
+int ForEachLoopPassedData::toAbsIdNot(int localId) const
+{
+ if(localId<0)
+ throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
+ int ret(0),curLocId(0);
+ for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+ {
+ if(*it)//<- diff is here !
+ {
+ if(localId==curLocId)
+ return ret;
+ curLocId++;
+ }
+ }
+ throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
+}
+
+int ForEachLoopPassedData::getNumberOfElementsToDo() const
+{
+ std::size_t nbAllElts(_flagsIds.size());
+ std::size_t ret(nbAllElts-_passedIds.size());
+ return ret;
+}
+
+void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
+{
+ std::size_t sz(execVals.size());
+ if(_passedOutputs.size()!=sz)
+ throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
+ for(std::size_t i=0;i<sz;i++)
+ {
+ SequenceAny *elt(_passedOutputs[i]);
+ SequenceAny *eltDestination(execVals[i]);
+ if(!elt)
+ throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
+ unsigned int szOfElt(elt->size());
+ for(unsigned int j=0;j<szOfElt;j++)
+ {
+ AnyPtr elt1((*elt)[j]);
+ int jAbs(toAbsIdNot(j));
+ eltDestination->setEltAtRank(jAbs,elt1);
+ }
+ }
+}
+
ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
_splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
- _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+ _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
{
}
ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
_splitterNode(other._splitterNode,this),
- _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+ _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
{
int i=0;
if(!editionOnly)
{
AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
- temp->addRepr(getOutPort((*iter2)->getName()),interc);
+ temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
interc->setRepr(temp);
_outGoingPorts.push_back(temp);
_intecptrsForOutGoingPorts.push_back(interc);
delete *iter;
for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
delete *iter2;
+ delete _passedData;
}
void ForEachLoop::init(bool start)
cleanDynGraph();
_currentIndex = 0;
exUpdateProgress();
+ if(_passedData)
+ _passedData->init();
}
void ForEachLoop::exUpdateState()
DEBTRACE("ForEachLoop::exUpdateState");
if(_state == YACS::DISABLED)
return;
+ if(_state == YACS::DONE)
+ return;
if(_inGate.exIsReady())
{
//internal graph update
int i;
- int nbOfBr=_nbOfBranches.getIntValue();
- int nbOfElts=_splitterNode.getNumberOfElements();
+ int nbOfBr(_nbOfBranches.getIntValue()),nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
+ if(_passedData)
+ {
+ _passedData->checkCompatibilyWithNb(nbOfElts);
+ nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
+ }
+ int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
DEBTRACE("nbOfElts=" << nbOfElts);
DEBTRACE("nbOfBr=" << nbOfBr);
- if(nbOfElts==0)
+ if(nbOfEltsToDo==0)
{
prepareSequenceValues(0);
delete _nodeForSpecialCases;
setState(YACS::ACTIVATED);
return ;
}
- if(nbOfBr>nbOfElts)
- nbOfBr=nbOfElts;
+ if(nbOfBr>nbOfEltsToDo)
+ nbOfBr=nbOfEltsToDo;
_execNodes.resize(nbOfBr);
_execIds.resize(nbOfBr);
_execOutGoingPorts.resize(nbOfBr);
{
for(i=0;i<nbOfBr;i++)
{
- DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
- _execIds[i]=_execCurrentId;
DEBTRACE( "-------------- 2" );
vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
if(_initNode)
DEBTRACE( "-------------- 5" );
createOutputOutOfScopeInterceptors(i);
DEBTRACE( "-------------- 6" );
- _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
+ }
+ for(i=0;i<nbOfBr;i++)
+ {
+ DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
+ _execIds[i]=_execCurrentId;
+ int posInAbs(_execCurrentId);
+ if(_passedData)
+ posInAbs=_passedData->toAbsId(_execCurrentId);
+ _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
+ _execCurrentId++;
DEBTRACE( "-------------- 7" );
- }
+ }
+ if(_passedData)
+ {
+ _passedData->checkLevel2(_execOutGoingPorts[0]);
+ _passedData->assignAlreadyDone(_execVals);
+ }
}
catch(YACS::Exception& ex)
{
_execNodes[id]->exUpdateState();
_nbOfEltConsumed++;
_initializingCounter--;
+ _currentIndex++;
if (_initializingCounter == 0)
_initNode->setState(DONE);
return YACS::NOEVENT;
_currentIndex++;
exUpdateProgress();
if(isNormalFinish)
- storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
- if(_execCurrentId==_splitterNode.getNumberOfElements())
+ {
+ int globalId(_execIds[id]);
+ if(_passedData)
+ globalId=_passedData->toAbsId(globalId);
+ storeOutValsInSeqForOutOfScopeUse(globalId,id);
+ }
+ //
+ if(_execCurrentId==getFinishedId())
{//No more elements of _dataPortToDispatch to treat
_execIds[id]=NOT_RUNNING_BRANCH_ID;
//analyzing if some samples are still on treatment on other branches.
try
{
if(_failedCounter!=0)
- {
+ {// case of keepgoing mode + a failed
std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
- throw YACS::Exception(oss.str());
+ DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom : "<< oss.str());
+ setState(YACS::FAILED);
+ return YACS::ABORT;
}
pushAllSequenceValues();
{//more elements to do and loop still activated
_execIds[id]=_execCurrentId;
node->init(false);
- _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
+ int posInAbs(_execCurrentId);
+ if(_passedData)
+ posInAbs=_passedData->toAbsId(_execCurrentId);
+ _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
+ _execCurrentId++;
node->exUpdateState();
forwardExecStateToOriginalBody(node);
_nbOfEltConsumed++;
else
{
TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
- AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
- InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
+ // The out going ports belong to the ForEachLoop, whereas
+ // the delegated port belongs to a node child of the ForEachLoop.
+ // The name of the delegated port contains dots (bloc.node.outport),
+ // whereas the name of the out going port shouldn't do.
+ std::string outputPortName = getPortName(port.first);
+ std::replace_if (outputPortName.begin(), outputPortName.end(),
+ std::bind1st(std::equal_to<char>(), '.'), '_');
+ outputPortName += "_interceptor";
+ AnySplitOutputPort *newPort=new AnySplitOutputPort(outputPortName,this,newTc);
+ InterceptorInputPort *intercptor=new InterceptorInputPort(outputPortName + "_in",this,port.first->edGetType());
intercptor->setRepr(newPort);
newTc->decrRef();
newPort->addRepr(port.first,intercptor);
}
}
+int ForEachLoop::getFinishedId()
+{
+ if(!_passedData)
+ return _splitterNode.getNumberOfElements();
+ else
+ return _passedData->getNumberOfElementsToDo();
+}
+
void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
{
_execVals.resize(_outGoingPorts.size());
void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
{
- if(isInMyDescendance(start->getNode())==_node)
- throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
+ DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
+ if(end->getNode() == &_splitterNode)
+ throw Exception("Illegal link within a foreach loop: \
+the 'SmplsCollection' port cannot be linked within the scope of the loop.");
+ if(end == &_nbOfBranches)
+ throw Exception("Illegal link within a foreach loop: \
+the 'nbBranches' port cannot be linked within the scope of the loop.");
}
std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
std::string ForEachLoop::getProgress() const
{
- int nbElems = _splitterNode.getNumberOfElements();
+ int nbElems(getNbOfElementsToBeProcessed());
std::stringstream aProgress;
if (nbElems > 0)
aProgress << _currentIndex << "/" << nbElems;
return aProgress.str();
}
+//! Get the progress weight for all elementary nodes
+/*!
+ * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
+ * by the number of elements done and the weight total by the number total of elements
+ */
+list<ProgressWeight> ForEachLoop::getProgressWeight() const
+{
+ list<ProgressWeight> ret;
+ list<Node *> setOfNode=edGetDirectDescendants();
+ int elemDone=getCurrentIndex();
+ int elemTotal=getNbOfElementsToBeProcessed();
+ for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
+ {
+ list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
+ for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
+ {
+ (*iter).weightDone=((*iter).weightTotal) * elemDone;
+ (*iter).weightTotal*=elemTotal;
+ }
+ ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
+ }
+ return ret;
+}
+
+int ForEachLoop::getNbOfElementsToBeProcessed() const
+{
+ int nbBranches = _nbOfBranches.getIntValue();
+ return _splitterNode.getNumberOfElements()
+ + (_initNode ? nbBranches:0)
+ + (_finalizeNode ? nbBranches:0) ;
+}
+
/*!
* This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
* The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
return _execVals[0]->getSetItems();
}
+/*!
+ * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
+ * getPassedResults method.
+ */
+void ForEachLoop::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
+{
+ delete _passedData;
+ _failedCounter=0;
+ _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
+}
+