-// Copyright (C) 2006-2014 CEA/DEN, EDF R&D
+// Copyright (C) 2006-2020 CEA/DEN, EDF R&D
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
#include "AutoLocker.hxx"
#include <iostream>
+#include <iomanip>
#include <sstream>
+#include <algorithm> // std::replace_if
+#include <functional> // std::bind
//#define _DEVDEBUG_
#include "YacsTrace.hxx"
+#ifdef WIN32
+#include <functional>
+#endif
+
using namespace YACS::ENGINE;
using namespace std;
const char SplitterNode::NAME_OF_SEQUENCE_INPUT[]="SmplsCollection";
-const char ForEachLoop::NAME_OF_SPLITTERNODE[]="splitter";
+const char ForEachLoopGen::NAME_OF_SPLITTERNODE[]="splitter";
+
+const int ForEachLoopGen::NOT_RUNNING_BRANCH_ID=-1;
-const int ForEachLoop::NOT_RUNNING_BRANCH_ID=-1;
+const char ForEachLoopGen::INTERCEPTOR_STR[]="_interceptor";
InterceptorInputPort::InterceptorInputPort(const std::string& name, Node *node, TypeCode* type):AnyInputPort(name,node,type),
DataPort(name,node,type),Port(node),
{
}
-bool AnySplitOutputPort::addInPort(InPort *inPort) throw(YACS::Exception)
+bool AnySplitOutputPort::addInPort(InPort *inPort)
{
bool ret=OutputPort::addInPort(inPort);
if(_repr)
_repr->getAllRepresented(represented);
}
-int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward) throw(YACS::Exception)
+int AnySplitOutputPort::removeInPort(InPort *inPort, bool forward)
{
bool ret=OutputPort::removeInPort(inPort,forward);
if(_repr)
for (int i = 0; i < nbElem; i++)
{
Any *val = getValueAtRank(i);
- switch (val->getType()->kind())
+ switch (((YACS::ENGINE::TypeCodeSeq *)edGetType())->contentType()->kind())
{
case Double:
- xmldump << "<value><double>" << val->getDoubleValue() << "</double></value>" << endl;
+ xmldump << "<value><double>" << setprecision(16) << val->getDoubleValue() << "</double></value>" << endl;
break;
case Int:
xmldump << "<value><int>" << val->getIntValue() << "</int></value>" << endl;
xmldump << "<value><string>" << val->getStringValue() << "</string></value>" << endl;
break;
case Objref:
- xmldump << "<value><objref>" << val->getStringValue() << "</objref></value>" << endl;
+ xmldump << "<value><objref>" << ToBase64(val->getStringValue()) << "</objref></value>" << endl;
break;
default:
xmldump << "<value><error> NO_SERIALISATION_AVAILABLE </error></value>" << endl;
}
SplitterNode::SplitterNode(const std::string& name, TypeCode *typeOfData,
- ForEachLoop *father):ElementaryNode(name),
+ ForEachLoopGen *father):ElementaryNode(name),
_dataPortToDispatch(NAME_OF_SEQUENCE_INPUT,
this,(TypeCodeSeq *)TypeCode::sequenceTc("","",typeOfData))
{
_father=father;
}
-SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoop *father):ElementaryNode(other,father),
+SplitterNode::SplitterNode(const SplitterNode& other, ForEachLoopGen *father):ElementaryNode(other,father),
_dataPortToDispatch(other._dataPortToDispatch,this)
{
}
-InputPort *SplitterNode::getInputPort(const std::string& name) const throw(YACS::Exception)
+InputPort *SplitterNode::getInputPort(const std::string& name) const
{
if(name==NAME_OF_SEQUENCE_INPUT)
return (InputPort *)&_dataPortToDispatch;
Node *SplitterNode::simpleClone(ComposedNode *father, bool editionOnly) const
{
- return new SplitterNode(*this,(ForEachLoop *)father);
+ return new SplitterNode(*this,(ForEachLoopGen *)father);
}
unsigned SplitterNode::getNumberOfElements() const
void SplitterNode::putSplittedValueOnRankTo(int rankInSeq, int branch, bool first)
{
Any *valueToDispatch=_dataPortToDispatch.getValueAtRank(rankInSeq);
- ForEachLoop *fatherTyped=(ForEachLoop *)_father;
+ ForEachLoopGen *fatherTyped=(ForEachLoopGen *)_father;
fatherTyped->putValueOnBranch(valueToDispatch,branch,first);
valueToDispatch->decrRef();
}
-FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoop *loop, bool normalFinish):ElementaryNode(NAME),
- _loop(loop),
- _normalFinish(normalFinish)
+FakeNodeForForEachLoop::FakeNodeForForEachLoop(ForEachLoopGen *loop, bool normalFinish):ElementaryNode(NAME),
+ _loop(loop),
+ _normalFinish(normalFinish)
{
_state=YACS::TOACTIVATE;
_father=_loop->getFather();
_loop->setState(YACS::DONE);
}
-ForEachLoop::ForEachLoop(const std::string& name, TypeCode *typeOfDataSplitted):DynParaLoop(name,typeOfDataSplitted),
- _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
- _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+ForEachLoopPassedData::ForEachLoopPassedData(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs):_passedIds(passedIds),_passedOutputs(passedOutputs),_nameOfOutputs(nameOfOutputs)
+{
+ std::size_t sz(_passedIds.size()),sz1(passedOutputs.size()),sz2(nameOfOutputs.size());
+ if(sz1!=sz2)
+ throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : nameOfOutputs and passedOutputs must have the same size !");
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ const SequenceAny *elt(*it);
+ if(elt)
+ if(sz!=(std::size_t)elt->size())
+ throw YACS::Exception("ForEachLoopPassedData::ForEachLoopPassedData : incoherent input of passed data !");
+ }
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ SequenceAny *elt(*it);
+ if(elt)
+ elt->incrRef();
+ }
+}
+
+ForEachLoopPassedData::ForEachLoopPassedData(const ForEachLoopPassedData& copy)
+: _passedIds(copy._passedIds),
+ _passedOutputs(copy._passedOutputs),
+ _nameOfOutputs(copy._nameOfOutputs),
+ _flagsIds(copy._flagsIds)
+{
+}
+
+ForEachLoopPassedData::~ForEachLoopPassedData()
+{
+ for(std::vector<SequenceAny *>::iterator it=_passedOutputs.begin();it!=_passedOutputs.end();it++)
+ {
+ SequenceAny *elt(*it);
+ if(elt)
+ elt->decrRef();
+ }
+}
+
+void ForEachLoopPassedData::init()
+{
+ _flagsIds.clear();
+}
+
+void ForEachLoopPassedData::checkCompatibilyWithNb(int nbOfElts) const
+{
+ if(nbOfElts<0)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : nb of elts is expected to be > 0 !");
+ std::size_t sizeExp(_passedIds.size()),nbOfElts2(nbOfElts);
+ if(nbOfElts2<sizeExp)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set !");
+ for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+ {
+ if((*it)>=nbOfElts2)
+ throw YACS::Exception("ForEachLoopPassedData::checkCompatibilyWithNb : Invalid nb of elemts in input seq regarding passed data set 2 !");
+ }
+ _flagsIds.resize(nbOfElts);
+ std::fill(_flagsIds.begin(),_flagsIds.end(),false);
+ for(std::vector<unsigned int>::const_iterator it=_passedIds.begin();it!=_passedIds.end();it++)
+ {
+ if(*it<nbOfElts)
+ {
+ if(!_flagsIds[*it])
+ _flagsIds[*it]=true;
+ else
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : id " << *it << " in list of ids appears more than once !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+ else
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkCompatibilyWithNb : Presence of id " << *it << " in list of ids ! Must be in [0," << nbOfElts << ") !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+}
+
+void ForEachLoopPassedData::checkLevel2(const std::vector<AnyInputPort *>& ports) const
+{
+ std::size_t sz(_nameOfOutputs.size());
+ if(sz!=ports.size())
+ throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : mismatch of size of vectors !");
+ for(std::size_t i=0;i<sz;i++)
+ {
+ AnyInputPort *elt(ports[i]);
+ if(!elt)
+ throw YACS::Exception("ForEachLoopPassedData::checkLevel2 : presence of null instance !");
+ if(_nameOfOutputs[i]!=elt->getName())
+ {
+ std::ostringstream oss; oss << "ForEachLoopPassedData::checkLevel2 : At pos #" << i << " the name is not OK !";
+ throw YACS::Exception(oss.str());
+ }
+ }
+}
+
+/*!
+ * performs local to abs id. Input \a localId refers to an id in all jobs to perform. Returned id refers to pos in whole output sequences.
+ */
+int ForEachLoopPassedData::toAbsId(int localId) const
+{
+ if(localId<0)
+ throw YACS::Exception("ForEachLoopPassedData::toAbsId : local pos must be >= 0 !");
+ int ret(0),curLocId(0);
+ for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+ {
+ if(!*it)
+ {
+ if(localId==curLocId)
+ return ret;
+ curLocId++;
+ }
+ }
+ throw YACS::Exception("ForEachLoopPassedData::toAbsId : not referenced Id !");
+}
+
+/*!
+ * Equivalent to toAbsId except that only ON are considered here.
+ */
+int ForEachLoopPassedData::toAbsIdNot(int localId) const
+{
+ if(localId<0)
+ throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : local pos must be >= 0 !");
+ int ret(0),curLocId(0);
+ for(std::vector<bool>::const_iterator it=_flagsIds.begin();it!=_flagsIds.end();it++,ret++)
+ {
+ if(*it)//<- diff is here !
+ {
+ if(localId==curLocId)
+ return ret;
+ curLocId++;
+ }
+ }
+ throw YACS::Exception("ForEachLoopPassedData::toAbsIdNot : not referenced Id !");
+}
+
+int ForEachLoopPassedData::getNumberOfElementsToDo() const
+{
+ std::size_t nbAllElts(_flagsIds.size());
+ std::size_t ret(nbAllElts-_passedIds.size());
+ return ret;
+}
+
+void ForEachLoopPassedData::assignAlreadyDone(const std::vector<SequenceAny *>& execVals) const
+{
+ std::size_t sz(execVals.size());
+ if(_passedOutputs.size()!=sz)
+ throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : mismatch of size of vectors !");
+ for(std::size_t i=0;i<sz;i++)
+ {
+ SequenceAny *elt(_passedOutputs[i]);
+ SequenceAny *eltDestination(execVals[i]);
+ if(!elt)
+ throw YACS::Exception("ForEachLoopPassedData::assignedAlreadyDone : presence of null elt !");
+ unsigned int szOfElt(elt->size());
+ for(unsigned int j=0;j<szOfElt;j++)
+ {
+ AnyPtr elt1((*elt)[j]);
+ int jAbs(toAbsIdNot(j));
+ eltDestination->setEltAtRank(jAbs,elt1);
+ }
+ }
+}
+
+ForEachLoopGen::ForEachLoopGen(const std::string& name, TypeCode *typeOfDataSplitted, std::unique_ptr<NbBranchesAbstract>&& branchManager):
+ DynParaLoop(name,typeOfDataSplitted,std::move(branchManager)),
+ _splitterNode(NAME_OF_SPLITTERNODE,typeOfDataSplitted,this),
+ _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
{
}
-ForEachLoop::ForEachLoop(const ForEachLoop& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
- _splitterNode(other._splitterNode,this),
- _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0)
+ForEachLoopGen::ForEachLoopGen(const ForEachLoopGen& other, ComposedNode *father, bool editionOnly):DynParaLoop(other,father,editionOnly),
+ _splitterNode(other._splitterNode,this),
+ _execCurrentId(0),_nodeForSpecialCases(0),_currentIndex(0),_passedData(0)
{
int i=0;
if(!editionOnly)
{
AnySplitOutputPort *temp=new AnySplitOutputPort(*(*iter2),this);
InterceptorInputPort *interc=new InterceptorInputPort(*other._intecptrsForOutGoingPorts[i],this);
- temp->addRepr(getOutPort((*iter2)->getName()),interc);
+ temp->addRepr(getOutPort(other.getOutPortName((*iter2)->getRepr())),interc);
interc->setRepr(temp);
_outGoingPorts.push_back(temp);
_intecptrsForOutGoingPorts.push_back(interc);
}
}
-Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
-{
- return new ForEachLoop(*this,father,editionOnly);
-}
-
-ForEachLoop::~ForEachLoop()
+ForEachLoopGen::~ForEachLoopGen()
{
cleanDynGraph();
for(vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
delete *iter;
for(vector<InterceptorInputPort *>::iterator iter2=_intecptrsForOutGoingPorts.begin();iter2!=_intecptrsForOutGoingPorts.end();iter2++)
delete *iter2;
+ delete _passedData;
}
-void ForEachLoop::init(bool start)
+void ForEachLoopGen::init(bool start)
{
DynParaLoop::init(start);
_splitterNode.init(start);
cleanDynGraph();
_currentIndex = 0;
exUpdateProgress();
+ if(_passedData)
+ _passedData->init();
}
-void ForEachLoop::exUpdateState()
+void ForEachLoopGen::exUpdateState()
{
- DEBTRACE("ForEachLoop::exUpdateState");
+ DEBTRACE("ForEachLoopGen::exUpdateState");
if(_state == YACS::DISABLED)
return;
+ if(_state == YACS::DONE)
+ return;
if(_inGate.exIsReady())
{
//internal graph update
int i;
- int nbOfBr=_nbOfBranches.getIntValue();
- int nbOfElts=_splitterNode.getNumberOfElements();
+ int nbOfElts(_splitterNode.getNumberOfElements()),nbOfEltsDone(0);
+ int nbOfBr(_nbOfBranches->getNumberOfBranches(nbOfElts));
+ if(_passedData)
+ {
+ _passedData->checkCompatibilyWithNb(nbOfElts);
+ nbOfEltsDone=_passedData->getNumberOfEltsAlreadyDone();
+ }
+ int nbOfEltsToDo(nbOfElts-nbOfEltsDone);
DEBTRACE("nbOfElts=" << nbOfElts);
DEBTRACE("nbOfBr=" << nbOfBr);
- if(nbOfElts==0)
+ if(nbOfEltsToDo==0)
{
prepareSequenceValues(0);
delete _nodeForSpecialCases;
setState(YACS::ACTIVATED);
return ;
}
- if(nbOfBr>nbOfElts)
- nbOfBr=nbOfElts;
+ if(nbOfBr>nbOfEltsToDo)
+ nbOfBr=nbOfEltsToDo;
_execNodes.resize(nbOfBr);
_execIds.resize(nbOfBr);
_execOutGoingPorts.resize(nbOfBr);
{
for(i=0;i<nbOfBr;i++)
{
- DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
- _execIds[i]=_execCurrentId;
DEBTRACE( "-------------- 2" );
vector<Node *> clonedNodes = cloneAndPlaceNodesCoherently(origNodes);
if(_initNode)
DEBTRACE( "-------------- 5" );
createOutputOutOfScopeInterceptors(i);
DEBTRACE( "-------------- 6" );
- _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,i,true);
+ }
+ for(i=0;i<nbOfBr;i++)
+ {
+ DEBTRACE( "-------------- 1 " << i << " " << _execCurrentId);
+ _execIds[i]=_execCurrentId;
+ int posInAbs(_execCurrentId);
+ if(_passedData)
+ posInAbs=_passedData->toAbsId(_execCurrentId);
+ _splitterNode.putSplittedValueOnRankTo(posInAbs,i,true);
+ _execCurrentId++;
DEBTRACE( "-------------- 7" );
- }
+ }
+ if(_passedData)
+ {
+ _passedData->checkLevel2(_execOutGoingPorts[0]);
+ _passedData->assignAlreadyDone(_execVals);
+ }
+ // clean inputs data coming from the outside in _node
+ set< InPort * > portsToSetVals=getAllInPortsComingFromOutsideOfCurrentScope();
+ for(auto iter : portsToSetVals)
+ {
+ InputPort *curPortCasted=(InputPort *) iter;//Cast granted by ForEachLoopGen::buildDelegateOf(InPort)
+ if(!curPortCasted->canSafelySqueezeMemory())// this can appear strange ! if not safelySqueeze -> release. Nevertheless it is true.
+ curPortCasted->releaseData(); // these input ports have been incremented with InputPort::put into DynParaLoop::prepareInputsFromOutOfScope. So they can be released now.
+ }
}
catch(YACS::Exception& ex)
{
//ForEachLoop must be put in error and the exception rethrown to notify the caller
- DEBTRACE( "ForEachLoop::exUpdateState: " << ex.what() );
+ DEBTRACE( "ForEachLoopGen::exUpdateState: " << ex.what() );
setState(YACS::ERROR);
+ setErrorDetails(ex.what());
exForwardFailed();
throw;
}
}
}
-void ForEachLoop::exUpdateProgress()
+void ForEachLoopGen::exUpdateProgress()
{
// emit notification to all observers registered with the dispatcher on any change of the node's state
sendEvent("progress");
}
-void ForEachLoop::getReadyTasks(std::vector<Task *>& tasks)
+void ForEachLoopGen::getReadyTasks(std::vector<Task *>& tasks)
{
if(!_node)
return;
}
}
-int ForEachLoop::getNumberOfInputPorts() const
+int ForEachLoopGen::getNumberOfInputPorts() const
{
return DynParaLoop::getNumberOfInputPorts()+1;
}
-void ForEachLoop::checkNoCyclePassingThrough(Node *node) throw(YACS::Exception)
+void ForEachLoopGen::checkNoCyclePassingThrough(Node *node)
{
//TO DO
}
-void ForEachLoop::selectRunnableTasks(std::vector<Task *>& tasks)
+void ForEachLoopGen::selectRunnableTasks(std::vector<Task *>& tasks)
{
}
-std::list<InputPort *> ForEachLoop::getSetOfInputPort() const
+std::list<InputPort *> ForEachLoopGen::getSetOfInputPort() const
{
list<InputPort *> ret=DynParaLoop::getSetOfInputPort();
ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
return ret;
}
-std::list<InputPort *> ForEachLoop::getLocalInputPorts() const
+std::list<InputPort *> ForEachLoopGen::getLocalInputPorts() const
{
list<InputPort *> ret=DynParaLoop::getLocalInputPorts();
ret.push_back((InputPort *)&_splitterNode._dataPortToDispatch);
return ret;
}
-InputPort *ForEachLoop::getInputPort(const std::string& name) const throw(YACS::Exception)
+InputPort *ForEachLoopGen::getInputPort(const std::string& name) const
{
if(name==SplitterNode::NAME_OF_SEQUENCE_INPUT)
return (InputPort *)&_splitterNode._dataPortToDispatch;
return DynParaLoop::getInputPort(name);
}
-OutputPort *ForEachLoop::getOutputPort(const std::string& name) const throw(YACS::Exception)
+OutputPort *ForEachLoopGen::getOutputPort(const std::string& name) const
{
for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
{
return DynParaLoop::getOutputPort(name);
}
-OutPort *ForEachLoop::getOutPort(const std::string& name) const throw(YACS::Exception)
+OutPort *ForEachLoopGen::getOutPort(const std::string& name) const
{
for(vector<AnySplitOutputPort *>::const_iterator iter=_outGoingPorts.begin();iter!=_outGoingPorts.end();iter++)
{
return DynParaLoop::getOutPort(name);
}
-Node *ForEachLoop::getChildByShortName(const std::string& name) const throw(YACS::Exception)
+Node *ForEachLoopGen::getChildByShortName(const std::string& name) const
{
if(name==NAME_OF_SPLITTERNODE)
return (Node *)&_splitterNode;
* \param node : the child node that has finished
* \return the state change
*/
-YACS::Event ForEachLoop::updateStateOnFinishedEventFrom(Node *node)
+YACS::Event ForEachLoopGen::updateStateOnFinishedEventFrom(Node *node)
{
DEBTRACE("updateStateOnFinishedEventFrom " << node->getName() << " " << node->getState());
unsigned int id;
switch(getIdentityOfNotifyerNode(node,id))
{
case INIT_NODE:
- _execNodes[id]->exUpdateState();
- _nbOfEltConsumed++;
- _initializingCounter--;
- if (_initializingCounter == 0) _initNode->setState(DONE);
- break;
+ return updateStateForInitNodeOnFinishedEventFrom(node,id);
case WORK_NODE:
- _currentIndex++;
- exUpdateProgress();
- storeOutValsInSeqForOutOfScopeUse(_execIds[id],id);
- if(_execCurrentId==_splitterNode.getNumberOfElements())
- {//No more elements of _dataPortToDispatch to treat
- _execIds[id]=NOT_RUNNING_BRANCH_ID;
- //analyzing if some samples are still on treatment on other branches.
- bool isFinished=true;
- for(int i=0;i<_execIds.size() && isFinished;i++)
- isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
- if(isFinished)
- {
- try
+ return updateStateForWorkNodeOnFinishedEventFrom(node,id,true);
+ case FINALIZE_NODE:
+ return updateStateForFinalizeNodeOnFinishedEventFrom(node,id);
+ default:
+ YASSERT(false);
+ }
+ return YACS::NOEVENT;
+}
+
+YACS::Event ForEachLoopGen::updateStateForInitNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+ _execNodes[id]->exUpdateState();
+ _nbOfEltConsumed++;
+ _initializingCounter--;
+ _currentIndex++;
+ if (_initializingCounter == 0)
+ _initNode->setState(DONE);
+ return YACS::NOEVENT;
+}
+
+/*!
+ * \param [in] isNormalFinish - if true
+ */
+YACS::Event ForEachLoopGen::updateStateForWorkNodeOnFinishedEventFrom(Node *node, unsigned int id, bool isNormalFinish)
+{
+ _currentIndex++;
+ exUpdateProgress();
+ if(isNormalFinish)
+ {
+ int globalId(_execIds[id]);
+ if(_passedData)
+ globalId=_passedData->toAbsId(globalId);
+ sendEvent2("progress_ok",&globalId);
+ storeOutValsInSeqForOutOfScopeUse(globalId,id);
+ }
+ else
+ {
+ int globalId(_execIds[id]);
+ if(_passedData)
+ globalId=_passedData->toAbsId(globalId);
+ sendEvent2("progress_ko",&globalId);
+ }
+ //
+ if(_execCurrentId==getFinishedId())
+ {//No more elements of _dataPortToDispatch to treat
+ _execIds[id]=NOT_RUNNING_BRANCH_ID;
+ //analyzing if some samples are still on treatment on other branches.
+ bool isFinished(true);
+ for(int i=0;i<_execIds.size() && isFinished;i++)
+ isFinished=(_execIds[i]==NOT_RUNNING_BRANCH_ID);
+ if(isFinished)
+ {
+ try
+ {
+ if(_failedCounter!=0)
+ {// case of keepgoing mode + a failed
+ std::ostringstream oss; oss << "Keep Going mode activated and some errors (" << _failedCounter << ")reported !";
+ DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom : "<< oss.str());
+ setState(YACS::FAILED);
+ return YACS::ABORT;
+ }
+ pushAllSequenceValues();
+
+ if (_node)
{
- pushAllSequenceValues();
-
- if (_node)
- {
- _node->setState(YACS::DONE);
-
- ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
- if (compNode)
- {
- std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
- std::list<Node *>::iterator iter=aChldn.begin();
- for(;iter!=aChldn.end();iter++)
- (*iter)->setState(YACS::DONE);
- }
- }
+ _node->setState(YACS::DONE);
- if (_finalizeNode == NULL)
+ ComposedNode* compNode = dynamic_cast<ComposedNode*>(_node);
+ if (compNode)
{
- // No finalize node, we just finish the loop at the end of exec nodes execution
- setState(YACS::DONE);
- return YACS::FINISH;
- }
- else
- {
- // Run the finalize nodes, the loop will be done only when they all finish
- _unfinishedCounter = 0; // This counter indicates how many branches are not finished
- for (int i=0 ; i<_execIds.size() ; i++)
- {
- YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
- DEBTRACE("Launching finalize node for branch " << i);
- _execFinalizeNodes[i]->exUpdateState();
- _unfinishedCounter++;
- }
- return YACS::NOEVENT;
+ std::list<Node *> aChldn = compNode->getAllRecursiveConstituents();
+ std::list<Node *>::iterator iter=aChldn.begin();
+ for(;iter!=aChldn.end();iter++)
+ (*iter)->setState(YACS::DONE);
}
}
- catch(YACS::Exception& ex)
+
+ if (_finalizeNode == NULL)
{
- DEBTRACE("ForEachLoop::updateStateOnFinishedEventFrom: "<<ex.what());
- //no way to push results : put following nodes in FAILED state
- //TODO could be more fine grain : put only concerned nodes in FAILED state
- exForwardFailed();
- setState(YACS::ERROR);
- return YACS::ABORT;
+ // No finalize node, we just finish the loop at the end of exec nodes execution
+ setState(YACS::DONE);
+ return YACS::FINISH;
}
- }
- }
- else if(_state == YACS::ACTIVATED)
- {//more elements to do and loop still activated
- _execIds[id]=_execCurrentId;
- node->init(false);
- _splitterNode.putSplittedValueOnRankTo(_execCurrentId++,id,false);
- node->exUpdateState();
- forwardExecStateToOriginalBody(node);
- _nbOfEltConsumed++;
- }
- else
- {//elements to process and loop no more activated
- DEBTRACE("foreach loop state " << _state);
- }
- break;
- case FINALIZE_NODE:
- {
- DEBTRACE("Finalize node finished on branch " << id);
- _unfinishedCounter--;
- _currentIndex++;
- exUpdateProgress();
- DEBTRACE(_unfinishedCounter << " finalize nodes still running");
- if (_unfinishedCounter == 0)
- {
- _finalizeNode->setState(YACS::DONE);
- setState(YACS::DONE);
- return YACS::FINISH;
+ else
+ {
+ // Run the finalize nodes, the loop will be done only when they all finish
+ _unfinishedCounter = 0; // This counter indicates how many branches are not finished
+ for (int i=0 ; i<_execIds.size() ; i++)
+ {
+ YASSERT(_execIds[i] == NOT_RUNNING_BRANCH_ID);
+ DEBTRACE("Launching finalize node for branch " << i);
+ _execFinalizeNodes[i]->exUpdateState();
+ _unfinishedCounter++;
+ }
+ return YACS::NOEVENT;
+ }
+ }
+ catch(YACS::Exception& ex)
+ {
+ DEBTRACE("ForEachLoopGen::updateStateOnFinishedEventFrom: "<<ex.what());
+ //no way to push results : put following nodes in FAILED state
+ //TODO could be more fine grain : put only concerned nodes in FAILED state
+ exForwardFailed();
+ setState(YACS::ERROR);
+ return YACS::ABORT;
+ }
}
- else
- return YACS::NOEVENT;
- break;
}
- default:
- YASSERT(false);
+ else if(_state == YACS::ACTIVATED)
+ {//more elements to do and loop still activated
+ _execIds[id]=_execCurrentId;
+ int posInAbs(_execCurrentId);
+ if(_passedData)
+ posInAbs=_passedData->toAbsId(_execCurrentId);
+ _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
+ //forwardExecStateToOriginalBody(node);
+ node->init(false);
+ _execCurrentId++;
+ node->exUpdateState();
+ //forwardExecStateToOriginalBody(node);
+ _nbOfEltConsumed++;
+ }
+ else
+ {//elements to process and loop no more activated
+ DEBTRACE("foreach loop state " << _state);
}
return YACS::NOEVENT;
}
-YACS::Event ForEachLoop::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
+YACS::Event ForEachLoopGen::updateStateForFinalizeNodeOnFinishedEventFrom(Node *node, unsigned int id)
+{
+ DEBTRACE("Finalize node finished on branch " << id);
+ _unfinishedCounter--;
+ _currentIndex++;
+ exUpdateProgress();
+ DEBTRACE(_unfinishedCounter << " finalize nodes still running");
+ if (_unfinishedCounter == 0)
+ {
+ _finalizeNode->setState(YACS::DONE);
+ setState(YACS::DONE);
+ return YACS::FINISH;
+ }
+ else
+ return YACS::NOEVENT;
+}
+
+YACS::Event ForEachLoopGen::updateStateOnFailedEventFrom(Node *node, const Executor *execInst)
+{
+ unsigned int id;
+ DynParaLoop::TypeOfNode ton(getIdentityOfNotifyerNode(node,id));
+ // TODO: deal with keepgoing without the dependency to Executor
+ if(ton!=WORK_NODE || !execInst->getKeepGoingProperty())
+ return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
+ else
+ {
+ _failedCounter++;
+ return updateStateForWorkNodeOnFinishedEventFrom(node,id,false);
+ }
+}
+
+void ForEachLoopGen::InterceptorizeNameOfPort(std::string& portName)
{
- return DynParaLoop::updateStateOnFailedEventFrom(node,execInst);
+ std::replace_if(portName.begin(), portName.end(), std::bind(std::equal_to<char>(), '.',std::placeholders::_1), '_');
+ portName += INTERCEPTOR_STR;
}
-void ForEachLoop::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
+void ForEachLoopGen::buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
{
DynParaLoop::buildDelegateOf(port,finalTarget,pointsOfView);
string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
}
else
{
- TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",port.first->edGetType());
- AnySplitOutputPort *newPort=new AnySplitOutputPort(getPortName(port.first),this,newTc);
- InterceptorInputPort *intercptor=new InterceptorInputPort(string("intercptr for ")+getPortName(port.first),this,port.first->edGetType());
+ TypeCode *tcTrad((YACS::ENGINE::TypeCode*)finalTarget->edGetType()->subContentType(getFEDeltaBetween(port.first,finalTarget)));
+ TypeCodeSeq *newTc=(TypeCodeSeq *)TypeCode::sequenceTc("","",tcTrad);
+ // The out going ports belong to the ForEachLoop, whereas
+ // the delegated port belongs to a node child of the ForEachLoop.
+ // The name of the delegated port contains dots (bloc.node.outport),
+ // whereas the name of the out going port shouldn't do.
+ std::string outputPortName(getPortName(port.first));
+ InterceptorizeNameOfPort(outputPortName);
+ AnySplitOutputPort *newPort(new AnySplitOutputPort(outputPortName,this,newTc));
+ InterceptorInputPort *intercptor(new InterceptorInputPort(outputPortName + "_in",this,tcTrad));
intercptor->setRepr(newPort);
newTc->decrRef();
newPort->addRepr(port.first,intercptor);
}
}
else
- throw Exception("ForEachLoop::buildDelegateOf : not implemented for DS because not specified");
+ throw Exception("ForEachLoopGen::buildDelegateOf : not implemented for DS because not specified");
}
-void ForEachLoop::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
+void ForEachLoopGen::getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
{
string typeOfPortInstance=(port.first)->getNameOfTypeOfCurrentInstance();
if(typeOfPortInstance==OutputPort::NAME)
break;
if(iter==_outGoingPorts.end())
{
- string what("ForEachLoop::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
+ string what("ForEachLoopGen::getDelegateOf : Port with name "); what+=port.first->getName(); what+=" not exported by ForEachLoop "; what+=_name;
throw Exception(what);
}
else
port.first=(*iter);
}
else
- throw Exception("ForEachLoop::getDelegateOf : not implemented because not specified");
+ throw Exception("ForEachLoopGen::getDelegateOf : not implemented because not specified");
}
-void ForEachLoop::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(YACS::Exception)
+void ForEachLoopGen::releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView)
{
string typeOfPortInstance=portDwn->getNameOfTypeOfCurrentInstance();
if(typeOfPortInstance==OutputPort::NAME)
}
}
-OutPort *ForEachLoop::getDynOutPortByAbsName(int branchNb, const std::string& name)
+OutPort *ForEachLoopGen::getDynOutPortByAbsName(int branchNb, const std::string& name)
{
string portName, nodeName;
splitNamesBySep(name,Node::SEP_CHAR_IN_PORT,nodeName,portName,false);
Node *staticChild = getChildByName(nodeName);
- return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoop::buildDelegateOf)
+ return _execNodes[branchNb]->getOutPort(portName);//It's impossible(garanteed by YACS::ENGINE::ForEachLoopGen::buildDelegateOf)
//that a link starting from _initNode goes out of scope of 'this'.
}
-void ForEachLoop::cleanDynGraph()
+void ForEachLoopGen::cleanDynGraph()
{
DynParaLoop::cleanDynGraph();
for(vector< SequenceAny *>::iterator iter3=_execVals.begin();iter3!=_execVals.end();iter3++)
_execOutGoingPorts.clear();
}
-void ForEachLoop::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
+void ForEachLoopGen::storeOutValsInSeqForOutOfScopeUse(int rank, int branchNb)
{
vector<AnyInputPort *>::iterator iter;
int i=0;
}
}
-void ForEachLoop::prepareSequenceValues(int sizeOfSamples)
+int ForEachLoopGen::getFinishedId()
+{
+ if(!_passedData)
+ return _splitterNode.getNumberOfElements();
+ else
+ return _passedData->getNumberOfElementsToDo();
+}
+
+void ForEachLoopGen::prepareSequenceValues(int sizeOfSamples)
{
_execVals.resize(_outGoingPorts.size());
vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
_execVals[i]=SequenceAny::New((*iter)->edGetType()->contentType(),sizeOfSamples);
}
-void ForEachLoop::pushAllSequenceValues()
+void ForEachLoopGen::pushAllSequenceValues()
{
vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
int i=0;
(*iter)->put((const void *)_execVals[i]);
}
-void ForEachLoop::createOutputOutOfScopeInterceptors(int branchNb)
+void ForEachLoopGen::createOutputOutOfScopeInterceptors(int branchNb)
{
vector<AnySplitOutputPort *>::iterator iter=_outGoingPorts.begin();
int i=0;
//AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,(*iter)->edGetType());
OutPort *portOut=getDynOutPortByAbsName(branchNb,getOutPortName(((*iter)->getRepr())));
DEBTRACE( portOut->getName() );
- AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,portOut->edGetType());
+ TypeCode *tc((TypeCode *)(*iter)->edGetType()->contentType());
+ AnyInputPort *interceptor=new AnyInputPort((*iter)->getName(),this,tc);
portOut->addInPort(interceptor);
_execOutGoingPorts[branchNb].push_back(interceptor);
}
}
-void ForEachLoop::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
- InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(YACS::Exception)
+void ForEachLoopGen::checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
+ InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd)
{
- if(isInMyDescendance(start->getNode())==_node)
- throw Exception("ForEachLoop::checkLinkPossibility : A link from work node to init node not permitted");
+ DynParaLoop::checkLinkPossibility(start, pointsOfViewStart, end, pointsOfViewEnd);
+ if(end->getNode() == &_splitterNode)
+ throw Exception("Illegal link within a foreach loop: \
+the 'SmplsCollection' port cannot be linked within the scope of the loop.");
+ if(end == _nbOfBranches->getPort())
+ throw Exception("Illegal link within a foreach loop: \
+the 'nbBranches' port cannot be linked within the scope of the loop.");
}
-std::list<OutputPort *> ForEachLoop::getLocalOutputPorts() const
+std::list<OutputPort *> ForEachLoopGen::getLocalOutputPorts() const
{
list<OutputPort *> ret;
ret.push_back(getOutputPort(NAME_OF_SPLITTED_SEQ_OUT));
return ret;
}
-void ForEachLoop::accept(Visitor *visitor)
-{
- visitor->visitForEachLoop(this);
-}
-
//! Dump the node state to a stream
/*!
* \param os : the output stream
*/
-void ForEachLoop::writeDot(std::ostream &os) const
+void ForEachLoopGen::writeDot(std::ostream &os) const
{
os << " subgraph cluster_" << getId() << " {\n" ;
//only one node in a loop
}
//! Reset the state of the node and its children depending on the parameter level
-void ForEachLoop::resetState(int level)
+void ForEachLoopGen::resetState(int level)
{
if(level==0)return;
DynParaLoop::resetState(level);
_execCurrentId=0;
- //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object)
cleanDynGraph();
}
-std::string ForEachLoop::getProgress() const
+std::string ForEachLoopGen::getProgress() const
{
- int nbElems = _splitterNode.getNumberOfElements();
+ int nbElems(getNbOfElementsToBeProcessed());
std::stringstream aProgress;
if (nbElems > 0)
aProgress << _currentIndex << "/" << nbElems;
return aProgress.str();
}
+//! Get the progress weight for all elementary nodes
+/*!
+ * Only elementary nodes have weight. For each node in the loop, the weight done is multiplied
+ * by the number of elements done and the weight total by the number total of elements
+ */
+list<ProgressWeight> ForEachLoopGen::getProgressWeight() const
+{
+ list<ProgressWeight> ret;
+ list<Node *> setOfNode=edGetDirectDescendants();
+ int elemDone=getCurrentIndex();
+ int elemTotal=getNbOfElementsToBeProcessed();
+ for(list<Node *>::const_iterator iter=setOfNode.begin();iter!=setOfNode.end();iter++)
+ {
+ list<ProgressWeight> myCurrentSet=(*iter)->getProgressWeight();
+ for(list<ProgressWeight>::iterator iter=myCurrentSet.begin();iter!=myCurrentSet.end();iter++)
+ {
+ (*iter).weightDone=((*iter).weightTotal) * elemDone;
+ (*iter).weightTotal*=elemTotal;
+ }
+ ret.insert(ret.end(),myCurrentSet.begin(),myCurrentSet.end());
+ }
+ return ret;
+}
+
+int ForEachLoopGen::getNbOfElementsToBeProcessed() const
+{
+ int nbOfElems(_splitterNode.getNumberOfElements());
+ int nbBranches = _nbOfBranches->getNumberOfBranches(nbOfElems);
+ return nbOfElems
+ + (_initNode ? nbBranches:0)
+ + (_finalizeNode ? nbBranches:0) ;
+}
+
/*!
* This method allows to retrieve the state of \a this during execution or after. This method works even if this is \b NOT complete, or during execution or after a failure in \a this.
* The typical usage of this method is to retrieve the results of items that passed successfully to avoid to lose all of them if only one fails.
*
* \sa edGetSeqOfSamplesPort
*/
-std::vector<unsigned int> ForEachLoop::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
+std::vector<unsigned int> ForEachLoopGen::getPassedResults(Executor *execut, std::vector<SequenceAny *>& outputs, std::vector<std::string>& nameOfOutputs) const
{
YACS::BASES::AutoLocker<YACS::BASES::Mutex> alck(&(execut->getTheMutexForSchedulerUpdate()));
if(_execVals.empty())
return std::vector<unsigned int>();
if(_execOutGoingPorts.empty())
return std::vector<unsigned int>();
- std::size_t sz(_execVals.size()); outputs.resize(sz); nameOfOutputs.resize(sz);
+ std::size_t sz(_execVals.size());
+ outputs.resize(sz);
+ nameOfOutputs.resize(sz);
const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
for(std::size_t i=0;i<sz;i++)
{
return _execVals[0]->getSetItems();
}
+/*!
+ * This method is typically useful for post-mortem relaunch to avoid to recompute already passed cases. This method takes in input exactly the parameters retrieved by
+ * getPassedResults method.
+ */
+void ForEachLoopGen::assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs)
+{
+ delete _passedData;
+ _failedCounter=0;
+ _passedData=new ForEachLoopPassedData(passedIds,passedOutputs,nameOfOutputs);
+}
+
+int ForEachLoopGen::getFEDeltaBetween(OutPort *start, InPort *end)
+{
+ Node *ns(start->getNode()),*ne(end->getNode());
+ ComposedNode *co(getLowestCommonAncestor(ns,ne));
+ int ret(0);
+ Node *work(ns);
+ while(work!=co)
+ {
+ ForEachLoopGen *isFE(dynamic_cast<ForEachLoopGen *>(work));
+ if(isFE)
+ ret++;
+ work=work->getFather();
+ }
+ if(dynamic_cast<AnySplitOutputPort *>(start))
+ ret--;
+ return ret;
+}
+
+/*!
+ * This method is used to obtain the values already processed by the ForEachLoop.
+ * A new ForEachLoopPassedData object is returned. You have to delete it.
+ */
+ForEachLoopPassedData* ForEachLoopGen::getProcessedData()const
+{
+ std::vector<SequenceAny *> outputs;
+ std::vector<std::string> nameOfOutputs;
+ if(_execVals.empty() || _execOutGoingPorts.empty())
+ return new ForEachLoopPassedData(std::vector<unsigned int>(), outputs, nameOfOutputs);
+ std::size_t sz(_execVals.size());
+ outputs.resize(sz);
+ nameOfOutputs.resize(sz);
+ const std::vector<AnyInputPort *>& ports(_execOutGoingPorts[0]);
+ for(std::size_t i=0;i<sz;i++)
+ {
+ outputs[i]=_execVals[i]->removeUnsetItemsFromThis();
+ nameOfOutputs[i]=ports[i]->getName();
+ }
+ return new ForEachLoopPassedData(_execVals[0]->getSetItems(), outputs, nameOfOutputs);
+}
+
+void ForEachLoopGen::setProcessedData(ForEachLoopPassedData* processedData)
+{
+ if(_passedData)
+ delete _passedData;
+ _passedData = processedData;
+}
+
+/*!
+ * \param portName : "interceptorized" name of port.
+ */
+const YACS::ENGINE::TypeCode* ForEachLoopGen::getOutputPortType(const std::string& portName)const
+{
+ const YACS::ENGINE::TypeCode* ret=NULL;
+ vector<AnySplitOutputPort *>::const_iterator it;
+ for(it=_outGoingPorts.begin();it!=_outGoingPorts.end() && ret==NULL;it++)
+ {
+ std::string originalPortName(getPortName(*it));
+ //InterceptorizeNameOfPort(originalPortName);
+ DEBTRACE("ForEachLoopGen::getOutputPortType compare " << portName << " == " << originalPortName);
+ if(originalPortName == portName)
+ {
+ ret = (*it)->edGetType()->contentType();
+ }
+ }
+ return ret;
+}
+
+Node *ForEachLoop::simpleClone(ComposedNode *father, bool editionOnly) const
+{
+ return new ForEachLoop(*this,father,editionOnly);
+}
+
+void ForEachLoop::accept(Visitor *visitor)
+{
+ visitor->visitForEachLoop(this);
+}
+
+void ForEachLoopDyn::accept(Visitor *visitor)
+{
+ visitor->visitForEachLoopDyn(this);
+}
+
+Node *ForEachLoopDyn::simpleClone(ComposedNode *father, bool editionOnly) const
+{
+ return new ForEachLoopDyn(*this,father,editionOnly);
+}