From 275f1e9eecc5ddc1f7de791a15709f1ec06a25ad Mon Sep 17 00:00:00 2001 From: Anthony Geay Date: Fri, 25 Jan 2019 10:00:54 +0100 Subject: [PATCH] Squeeze memory of process hosting YACS graph --- src/engine/AnyInputPort.cxx | 25 +++++++++--- src/engine/AnyInputPort.hxx | 3 ++ src/engine/ComposedNode.cxx | 6 ++- src/engine/ComposedNode.hxx | 1 + src/engine/ConditionInputPort.cxx | 12 ++++-- src/engine/ConditionInputPort.hxx | 3 +- src/engine/Dispatcher.cxx | 18 +++++++++ src/engine/Dispatcher.hxx | 2 + src/engine/DynParaLoop.cxx | 3 +- src/engine/DynParaLoop.hxx | 3 +- src/engine/ElementaryNode.cxx | 4 +- src/engine/ForEachLoop.cxx | 14 +++++-- src/engine/ForEachLoop.hxx | 2 +- src/engine/InPort.cxx | 48 +++++++++++++++++++----- src/engine/InPort.hxx | 6 ++- src/engine/InPropertyPort.cxx | 11 ++++-- src/engine/InPropertyPort.hxx | 2 +- src/engine/InputPort.cxx | 9 ++++- src/engine/InputPort.hxx | 5 ++- src/engine/Loop.cxx | 8 +++- src/engine/Loop.hxx | 2 + src/engine/Node.cxx | 12 +++++- src/engine/Node.hxx | 1 + src/engine/OptimizerLoop.hxx | 1 - src/engine/Test/RuntimeForEngineTest.cxx | 7 ++++ src/engine/Test/RuntimeForEngineTest.hxx | 1 + src/engine/Test/ToyNode.cxx | 16 +++++++- src/engine/Test/ToyNode.hxx | 2 + src/runtime/CORBAPorts.cxx | 4 ++ src/runtime/CORBAPorts.hxx | 1 + src/runtime/CppPorts.cxx | 8 +++- src/runtime/CppPorts.hxx | 1 + src/runtime/PythonNode.cxx | 48 +++++++++++++++++++++++- src/runtime/PythonNode.hxx | 7 ++++ src/runtime/PythonPorts.cxx | 26 +++++++++++-- src/runtime/PythonPorts.hxx | 5 ++- src/runtime/XMLPorts.cxx | 4 ++ src/runtime/XMLPorts.hxx | 1 + 38 files changed, 281 insertions(+), 51 deletions(-) diff --git a/src/engine/AnyInputPort.cxx b/src/engine/AnyInputPort.cxx index 2c914bbe1..6fa9acba5 100644 --- a/src/engine/AnyInputPort.cxx +++ b/src/engine/AnyInputPort.cxx @@ -84,16 +84,29 @@ void AnyInputPort::exRestoreInit() _value->incrRef(); } -void AnyInputPort::put(Any *data) +void AnyInputPort::releaseDataUnsafe() { - YACS::BASES::AutoLocker lock(&_mutex); if(_value) _value->decrRef(); + _value = nullptr; +} + +void AnyInputPort::releaseData() +{ + YACS::BASES::AutoLocker lock(&_mutex); + releaseDataUnsafe(); +} + +void AnyInputPort::put(Any *data) +{ + YACS::BASES::AutoLocker lock(&_mutex); + releaseDataUnsafe(); _value=data; - if (_value) { - _value->incrRef(); - DEBTRACE("value ref count: " << _value->getRefCnt()); - } + if (_value) + { + _value->incrRef(); + DEBTRACE("value ref count: " << _value->getRefCnt()); + } } bool AnyInputPort::isEmpty() diff --git a/src/engine/AnyInputPort.hxx b/src/engine/AnyInputPort.hxx index c276a8131..1791e3e8b 100644 --- a/src/engine/AnyInputPort.hxx +++ b/src/engine/AnyInputPort.hxx @@ -40,6 +40,7 @@ namespace YACS void exRestoreInit(); Any *getValue() const { return _value; } int getIntValue() const { return _value ? _value->getIntValue():0; } + void releaseData() override; void put(Any *data); void *get() const; virtual std::string getAsString(); @@ -48,6 +49,8 @@ namespace YACS InputPort *clone(Node *newHelder) const; std::string dump(); virtual std::string typeName() {return "YACS__ENGINE__AnyInputPort";} + protected: + void releaseDataUnsafe(); protected: Any *_value; private: diff --git a/src/engine/ComposedNode.cxx b/src/engine/ComposedNode.cxx index cc9f49fb1..ccb4f7917 100644 --- a/src/engine/ComposedNode.cxx +++ b/src/engine/ComposedNode.cxx @@ -307,13 +307,17 @@ bool ComposedNode::edAddLink(OutPort *start, InPort *end) throw(YACS::Exception) iterS=end->getNode()->_father; InPort *currentPortI=end; + bool isLoopProofLink(true),isFirstTurn(true); while(iterS!=lwstCmnAnctr) { + if(!isFirstTurn && iterS->isLoop())// isFirstTurn is a way to filter special inputs like nbBranches, splitPort... These special inputs are loopProof -> they must not be realeased by ForEachLoop::exUpdateState + isLoopProofLink=false; iterS->buildDelegateOf(currentPortI, start, allAscendanceOfNodeStart); iterS=iterS->_father; + isFirstTurn=false; } bool ret=(pO.first)->addInPort(currentPortI); - end->edNotifyReferencedBy(pO.second); + end->edNotifyReferencedBy(pO.second,isLoopProofLink); return ret; } diff --git a/src/engine/ComposedNode.hxx b/src/engine/ComposedNode.hxx index 0d6e78314..68f545694 100644 --- a/src/engine/ComposedNode.hxx +++ b/src/engine/ComposedNode.hxx @@ -76,6 +76,7 @@ namespace YACS void edRemoveLink(OutPort *start, InPort *end) throw(Exception); void edRemoveLink(OutGate *start, InGate *end) throw(Exception); virtual bool isRepeatedUnpredictablySeveralTimes() const { return false; } + virtual bool isLoop() const { return false; } virtual std::list edGetDirectDescendants() const = 0; virtual void removeRecursivelyRedundantCL(); std::list getRecursiveConstituents() const; diff --git a/src/engine/ConditionInputPort.cxx b/src/engine/ConditionInputPort.cxx index 48ee63203..d5dc9b35b 100644 --- a/src/engine/ConditionInputPort.cxx +++ b/src/engine/ConditionInputPort.cxx @@ -73,7 +73,7 @@ bool ConditionInputPort::isLinkedOutOfScope() const return _outOfScopeBackLink!=0; } -void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort) +void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof) { if(!((ComposedNode*)(_node))->isInMyDescendance(fromPort->getNode())) { @@ -81,7 +81,7 @@ void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort) throw Exception("ConditionInputPort::edNotifyReferenced : already linked from outside"); _outOfScopeBackLink=fromPort; } - InputPort::edNotifyReferencedBy(fromPort); + InputPort::edNotifyReferencedBy(fromPort,isLoopProof); } void ConditionInputPort::edNotifyDereferencedBy(OutPort *fromPort) @@ -104,10 +104,16 @@ void ConditionInputPort::put(const void *data) throw(ConversionException) put((Any*)data); } -void ConditionInputPort::put(Any *data) throw(ConversionException) +void ConditionInputPort::releaseData() { if(_value) _value->decrRef(); + _value=nullptr; +} + +void ConditionInputPort::put(Any *data) throw(ConversionException) +{ + ConditionInputPort::releaseData(); _value=data; _value->incrRef(); } diff --git a/src/engine/ConditionInputPort.hxx b/src/engine/ConditionInputPort.hxx index c1467b3e3..d96918c06 100644 --- a/src/engine/ConditionInputPort.hxx +++ b/src/engine/ConditionInputPort.hxx @@ -42,11 +42,12 @@ namespace YACS void exSaveInit(); void exRestoreInit(); bool isLinkedOutOfScope() const; - void edNotifyReferencedBy(OutPort *fromPort); + void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof) override; void edNotifyDereferencedBy(OutPort *fromPort); void *get() const; virtual void put(const void *data) throw(ConversionException); void put(Any *data) throw(ConversionException); + void releaseData() override; std::string dump(); virtual std::string getAsString(); protected: diff --git a/src/engine/Dispatcher.cxx b/src/engine/Dispatcher.cxx index 95afd669d..03c0276a9 100644 --- a/src/engine/Dispatcher.cxx +++ b/src/engine/Dispatcher.cxx @@ -40,6 +40,11 @@ void Observer::notifyObserver2(Node* object,const std::string& event, void *some DEBTRACE("notifyObserver2 " << event << object ); } +void Observer::notifyObserverFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent) +{ + DEBTRACE("notifyObserverFromClone " << event << originalInstance ); +} + Dispatcher* Dispatcher::_singleton = 0; Dispatcher::~Dispatcher() @@ -104,6 +109,19 @@ void Dispatcher::dispatch2(Node* object,const std::string& event, void *somethin } } +void Dispatcher::dispatchFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent) +{ + std::pair key(originalInstance,event); + std::map< std::pair , std::set >::const_iterator it(_observers.find(key)); + if(it!=_observers.end()) + { + for(std::set::const_iterator iter=(*it).second.begin();iter!=(*it).second.end();iter++) + { + (*iter)->notifyObserverFromClone(originalInstance,event,clonedInstanceGeneratingEvent); + } + } +} + void Dispatcher::addObserver(Observer* observer,Node* object, const std::string& event) { _observers[std::pair(object,event)].insert(observer); diff --git a/src/engine/Dispatcher.hxx b/src/engine/Dispatcher.hxx index 0ae04705e..fcd4ce452 100644 --- a/src/engine/Dispatcher.hxx +++ b/src/engine/Dispatcher.hxx @@ -47,6 +47,7 @@ namespace YACS public: virtual void notifyObserver(Node* object,const std::string& event); virtual void notifyObserver2(Node* object,const std::string& event, void *something); + virtual void notifyObserverFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent); virtual ~Observer(); }; @@ -74,6 +75,7 @@ namespace YACS public: virtual void dispatch(Node* object,const std::string& event); virtual void dispatch2(Node* object,const std::string& event, void *something); + virtual void dispatchFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent); virtual void addObserver(Observer* observer,Node* object,const std::string& event); virtual void removeObserver(Observer* observer,Node* object,const std::string& event); virtual void printObservers(); diff --git a/src/engine/DynParaLoop.cxx b/src/engine/DynParaLoop.cxx index a0fa8b453..2a84e19be 100644 --- a/src/engine/DynParaLoop.cxx +++ b/src/engine/DynParaLoop.cxx @@ -372,8 +372,9 @@ void DynParaLoop::prepareInputsFromOutOfScope(int branchNb) if(portToSet)//portToSet==0 in case of portToSet==_splitterNode._dataPortToDispatch of ForEach { portToSet->put((const void *)val); - portToSet->edNotifyReferencedBy(0);//This is to indicate that somewhere somebody deals with this inputport + portToSet->edNotifyReferencedBy(nullptr,false);//This is to indicate that somewhere somebody deals with this inputport //even if no direct physical link exists. This exclusively for _execNodes[branchNb]::init on the next turn of loop. + //false is put as 2nd parameter to tell to portToSet, do not touch to the data in case of squeezeMemory. } } } diff --git a/src/engine/DynParaLoop.hxx b/src/engine/DynParaLoop.hxx index e491b5b0e..d2849265d 100644 --- a/src/engine/DynParaLoop.hxx +++ b/src/engine/DynParaLoop.hxx @@ -91,6 +91,7 @@ namespace YACS //! For the moment false is returned : impovement about it coming soon. bool isPlacementPredictableB4Run() const; void edRemoveChild(Node *node) throw(Exception); + bool isLoop() const override { return true; } virtual bool edAddChild(Node *DISOWNnode) throw(Exception); std::list edGetDirectDescendants() const; std::list getSetOfInputPort() const; @@ -112,6 +113,7 @@ namespace YACS Node * getFinalizeNode(); int getMaxLevelOfParallelism() const; void partitionRegardingDPL(const PartDefinition *pd, std::map >& zeMap); + virtual void cleanDynGraph(); protected: void buildDelegateOf(InPort * & port, OutPort *initialStart, const std::list& pointsOfView); void buildDelegateOf(std::pair& port, InPort *finalTarget, const std::list& pointsOfView); @@ -124,7 +126,6 @@ namespace YACS virtual void checkLinkPossibility(OutPort *start, const std::list& pointsOfViewStart, InPort *end, const std::list& pointsOfViewEnd) throw(Exception); protected: - void cleanDynGraph(); void prepareInputsFromOutOfScope(int branchNb); void putValueOnBranch(Any *val, unsigned branchId, bool first); TypeOfNode getIdentityOfNotifyerNode(const Node *node, unsigned& id); diff --git a/src/engine/ElementaryNode.cxx b/src/engine/ElementaryNode.cxx index 1aef1c196..d227016b9 100644 --- a/src/engine/ElementaryNode.cxx +++ b/src/engine/ElementaryNode.cxx @@ -435,10 +435,10 @@ void ElementaryNode::getReadyTasks(std::vector& tasks) std::string input_port_name = (*iter1)->getName(); // Get Port Name in master node InputPort * master_port = getInputPort(input_port_name); - for (std::set::const_iterator itt=master_port->_backLinks.begin(); itt!=master_port->_backLinks.end();itt++) + for (auto itt : master_port->_backLinks) { // Connect dataflow - getProc()->edAddDFLink((*itt),(*iter1)); + getProc()->edAddDFLink(itt.first,(*iter1)); } } diff --git a/src/engine/ForEachLoop.cxx b/src/engine/ForEachLoop.cxx index b3d87fdfa..33faee131 100644 --- a/src/engine/ForEachLoop.cxx +++ b/src/engine/ForEachLoop.cxx @@ -604,6 +604,14 @@ void ForEachLoop::exUpdateState() _passedData->checkLevel2(_execOutGoingPorts[0]); _passedData->assignAlreadyDone(_execVals); } + // clean inputs data coming from the outside in _node + set< InPort * > portsToSetVals=getAllInPortsComingFromOutsideOfCurrentScope(); + for(auto iter : portsToSetVals) + { + InputPort *curPortCasted=(InputPort *) iter;//Cast granted by ForEachLoop::buildDelegateOf(InPort) + if(!curPortCasted->canSafelySqueezeMemory())// this can appear strange ! if not safelySqueeze -> release. Nevertheless it is true. + curPortCasted->releaseData(); // these input ports have been incremented with InputPort::put into DynParaLoop::prepareInputsFromOutOfScope. So they can be released now. + } } catch(YACS::Exception& ex) { @@ -853,14 +861,15 @@ YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, u else if(_state == YACS::ACTIVATED) {//more elements to do and loop still activated _execIds[id]=_execCurrentId; - node->init(false); int posInAbs(_execCurrentId); if(_passedData) posInAbs=_passedData->toAbsId(_execCurrentId); _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false); + //forwardExecStateToOriginalBody(node); + node->init(false); _execCurrentId++; node->exUpdateState(); - forwardExecStateToOriginalBody(node); + //forwardExecStateToOriginalBody(node); _nbOfEltConsumed++; } else @@ -1119,7 +1128,6 @@ void ForEachLoop::resetState(int level) if(level==0)return; DynParaLoop::resetState(level); _execCurrentId=0; - //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) cleanDynGraph(); } diff --git a/src/engine/ForEachLoop.hxx b/src/engine/ForEachLoop.hxx index c31f3f9ad..f89798005 100644 --- a/src/engine/ForEachLoop.hxx +++ b/src/engine/ForEachLoop.hxx @@ -208,6 +208,7 @@ namespace YACS void assignPassedResults(const std::vector& passedIds, const std::vector& passedOutputs, const std::vector& nameOfOutputs); #endif const TypeCode* getOutputPortType(const std::string& portName)const; + void cleanDynGraph() override; protected: Node *simpleClone(ComposedNode *father, bool editionOnly=true) const; void checkLinkPossibility(OutPort *start, const std::list& pointsOfViewStart, @@ -221,7 +222,6 @@ namespace YACS void getDelegateOf(std::pair& port, InPort *finalTarget, const std::list& pointsOfView) throw(Exception); void releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list& pointsOfView) throw(Exception); protected: - void cleanDynGraph(); void pushAllSequenceValues(); void createOutputOutOfScopeInterceptors(int branchNb); void prepareSequenceValues(int sizeOfSamples); diff --git a/src/engine/InPort.cxx b/src/engine/InPort.cxx index 4cdea5bb3..fd39c76d8 100644 --- a/src/engine/InPort.cxx +++ b/src/engine/InPort.cxx @@ -20,7 +20,9 @@ #include "InPort.hxx" #include "OutPort.hxx" #include "ComposedNode.hxx" + #include +#include using namespace YACS::ENGINE; using namespace std; @@ -47,13 +49,13 @@ int InPort::edGetNumberOfLinks() const void InPort::edRemoveAllLinksLinkedWithMe() throw(YACS::Exception) { - set temp(_backLinks);//edRemoveLink called after causes invalidation of set iterator. - for(set::iterator iter=temp.begin();iter!=temp.end();iter++) + set< std::pair > temp(_backLinks);//edRemoveLink called after causes invalidation of set iterator. + for(auto iter : temp) { set trueBackOutputs; - (*iter)->getAllRepresented(trueBackOutputs); - for(set::iterator iter2=trueBackOutputs.begin();iter2!=trueBackOutputs.end();iter2++) - _node->getRootNode()->edRemoveLink(*iter2,this); + iter.first->getAllRepresented(trueBackOutputs); + for( auto iter2 : trueBackOutputs ) + _node->getRootNode()->edRemoveLink(iter2,this); } _backLinks.clear(); modified(); @@ -62,18 +64,46 @@ void InPort::edRemoveAllLinksLinkedWithMe() throw(YACS::Exception) //! Returns \b physical backlinks \b NOT user backlinks. std::set InPort::edSetOutPort() const { - return _backLinks; + std::set ret; + for( auto iter : _backLinks ) + ret.insert(iter.first); + return ret; +} + +bool InPort::canSafelySqueezeMemory() const +{ + if(!isBackLinked()) + return false; + for(auto bl : _backLinks) + { + if(!bl.second) + return false; + } + return true; } -void InPort::edNotifyReferencedBy(OutPort *fromPort) +/*! + * \param [in] isLoopProof - Tells if the data coming from \a fromPort will be send again in case of \a this is initialized. This value is + * important if \a this is an InPort of a Node contained directly or not inside a Loop, ForEachLoop, OptimizerLoop. + * In this case, to optimize memory consumption (see squeezeMemory method), we need to know if data coming from \a fromPort + * will be generated again in case. + * If true (the default) it means that for that link is a link loop proof so no need to take care. If false, the link is not loop proof so + * event in the context of agressive memory management the data can't be safely released. + */ +void InPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof) { - _backLinks.insert(fromPort); + auto it(std::find_if(_backLinks.begin(),_backLinks.end(),[fromPort](const std::pair& p){ return p.first==fromPort; })); + if(it!=_backLinks.end()) + _backLinks.erase(it); + _backLinks.insert(std::pair(fromPort,isLoopProof)); modified(); } void InPort::edNotifyDereferencedBy(OutPort *fromPort) { - _backLinks.erase(fromPort); + auto it(std::find_if(_backLinks.begin(),_backLinks.end(),[fromPort](const std::pair& p){ return p.first==fromPort; })); + if(it!=_backLinks.end()) + _backLinks.erase(it); modified(); } diff --git a/src/engine/InPort.hxx b/src/engine/InPort.hxx index 4778aa979..bb8a221d7 100644 --- a/src/engine/InPort.hxx +++ b/src/engine/InPort.hxx @@ -62,17 +62,19 @@ namespace YACS virtual InPort *getPublicRepresentant() { return this; } virtual int edGetNumberOfLinks() const; virtual std::set edSetOutPort() const; + bool canSafelySqueezeMemory() const; + bool isBackLinked() const { return !_backLinks.empty(); } virtual ~InPort(); virtual std::string typeName() {return "YACS__ENGINE__InPort";} protected: InPort(const InPort& other, Node *newHelder); InPort(const std::string& name, Node *node, TypeCode* type); void edRemoveAllLinksLinkedWithMe() throw(Exception); - virtual void edNotifyReferencedBy(OutPort *fromPort); + virtual void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof = true); virtual void edNotifyDereferencedBy(OutPort *fromPort); virtual void getAllRepresentants(std::set& repr) const; protected: - std::set _backLinks; + std::set< std::pair > _backLinks; }; } } diff --git a/src/engine/InPropertyPort.cxx b/src/engine/InPropertyPort.cxx index b5a8bfb69..3d4005471 100644 --- a/src/engine/InPropertyPort.cxx +++ b/src/engine/InPropertyPort.cxx @@ -89,6 +89,13 @@ InPropertyPort::put(const void *data) throw(ConversionException) put((YACS::ENGINE::Any *)data); } +void InPropertyPort::releaseData() +{ + if(_property_data) + _property_data->decrRef(); + _property_data = nullptr; +} + void InPropertyPort::put(YACS::ENGINE::Any *data) { @@ -100,9 +107,7 @@ InPropertyPort::put(YACS::ENGINE::Any *data) std::string value = ((*seq_data)[i]["value"])->getStringValue(); exNewPropertyValue(key, value); } - - if(_property_data) - _property_data->decrRef(); + InPropertyPort::releaseData(); _property_data = data; _property_data->incrRef(); DEBTRACE("value ref count: " << _property_data->getRefCnt()); diff --git a/src/engine/InPropertyPort.hxx b/src/engine/InPropertyPort.hxx index cd0677ca6..c9cff5b45 100644 --- a/src/engine/InPropertyPort.hxx +++ b/src/engine/InPropertyPort.hxx @@ -61,7 +61,7 @@ namespace YACS virtual void *get() const; virtual void put(const void *data) throw(ConversionException); virtual void put(YACS::ENGINE::Any *data); - + void releaseData() override; protected: InPropertyPort(const InPropertyPort& other, Node *newHelder); InPropertyPort(const std::string& name, Node *node, TypeCode* type, bool canBeNull = false); diff --git a/src/engine/InputPort.cxx b/src/engine/InputPort.cxx index eaef5d34f..a0b141397 100644 --- a/src/engine/InputPort.cxx +++ b/src/engine/InputPort.cxx @@ -206,9 +206,9 @@ InputPort *ProxyPort::clone(Node *newHelder) const throw Exception("ProxyPort::clone : internal error - should never happened"); } -void ProxyPort::edNotifyReferencedBy(OutPort *fromPort) +void ProxyPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof) { - _port->edNotifyReferencedBy(fromPort); + _port->edNotifyReferencedBy(fromPort,isLoopProof); } void ProxyPort::edNotifyDereferencedBy(OutPort *fromPort) @@ -250,6 +250,11 @@ void *ProxyPort::get() const return _port->get(); } +void ProxyPort::releaseData() +{ + _port->releaseData(); +} + void ProxyPort::put(const void *data) throw(ConversionException) { _port->put(data); diff --git a/src/engine/InputPort.hxx b/src/engine/InputPort.hxx index 47326878c..91b368fe5 100644 --- a/src/engine/InputPort.hxx +++ b/src/engine/InputPort.hxx @@ -74,6 +74,7 @@ namespace YACS virtual bool isEmpty(); virtual void *get() const = 0; + virtual void releaseData() = 0; virtual void put(const void *data) throw(ConversionException) = 0; virtual std::string dump(); virtual std::string getHumanRepr(); @@ -83,6 +84,7 @@ namespace YACS protected: InputPort(const InputPort& other, Node *newHelder); InputPort(const std::string& name, Node *node, TypeCode* type, bool canBeNull = false); + void releaseDataUnsafe(); protected: Any *_initValue; std::string _stringRef; @@ -102,7 +104,7 @@ namespace YACS void edRemoveAllLinksLinkedWithMe() throw(Exception); InputPort *clone(Node *newHelder) const; - void edNotifyReferencedBy(OutPort *fromPort); + void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof = true) override; void edNotifyDereferencedBy(OutPort *fromPort); std::set edSetOutPort() const; #ifdef NOCOVARIANT @@ -112,6 +114,7 @@ namespace YACS #endif void *get() const; virtual void put(const void *data) throw(ConversionException) ; + void releaseData() override; int edGetNumberOfLinks() const; bool isIntermediate() const { return true; } void exRestoreInit(); diff --git a/src/engine/Loop.cxx b/src/engine/Loop.cxx index 1634e1b49..7f0a29097 100644 --- a/src/engine/Loop.cxx +++ b/src/engine/Loop.cxx @@ -85,10 +85,16 @@ InputPort *InputPort4DF2DS::clone(Node *newHelder) const throw Exception("InputPort4DF2DS::clone : internal error"); } -void InputPort4DF2DS::put(Any *data) +void InputPort4DF2DS::releaseData() { if(_data) _data->decrRef(); + _data = nullptr; +} + +void InputPort4DF2DS::put(Any *data) +{ + InputPort4DF2DS::releaseData(); _data=data; _data->incrRef(); } diff --git a/src/engine/Loop.hxx b/src/engine/Loop.hxx index b606029f4..4b67377da 100644 --- a/src/engine/Loop.hxx +++ b/src/engine/Loop.hxx @@ -43,6 +43,7 @@ namespace YACS public: InputPort4DF2DS(DFToDSForLoop *node, TypeCode* type); void getAllRepresentants(std::set& repr) const; + void releaseData() override; void put(const void *data) throw(ConversionException); InputPort *clone(Node *newHelder) const; void *get() const throw(Exception); @@ -162,6 +163,7 @@ namespace YACS //Node* DISOWNnode is a SWIG notation to indicate that the ownership of the node is transfered to C++ Node *edSetNode(Node *DISOWNnode); virtual bool edAddChild(Node *DISOWNnode) throw(Exception); + bool isLoop() const override { return true; } Node *edRemoveNode(); virtual void checkBasicConsistency() const throw(Exception); //! Returns the port which value is used to take decision about the continuation of the loop. diff --git a/src/engine/Node.cxx b/src/engine/Node.cxx index f7d8196ad..fb7d4e841 100644 --- a/src/engine/Node.cxx +++ b/src/engine/Node.cxx @@ -86,6 +86,7 @@ Node::Node(const Node& other, ComposedNode *father):_inGate(this),_outGate(this) // Every node has an InPropertyPort _inPropertyPort = new InPropertyPort("__InPropertyPort__Node__YACS_", this, Runtime::_tc_propvec); + _eventReceiver=const_cast(&other); } Node::~Node() @@ -683,8 +684,15 @@ void Node::applyDPLScope(ComposedNode *gfn) void Node::sendEvent(const std::string& event) { DEBTRACE("Node::sendEvent " << event); - Dispatcher* disp=Dispatcher::getDispatcher(); - disp->dispatch(this,event); + Dispatcher *disp(Dispatcher::getDispatcher()); + if(!_eventReceiver) + { + disp->dispatch(this,event); + } + else + { + disp->dispatchFromClone(_eventReceiver,event,this); + } } //! emit notification to all observers registered with the dispatcher diff --git a/src/engine/Node.hxx b/src/engine/Node.hxx index fbd227670..9ec273ff9 100644 --- a/src/engine/Node.hxx +++ b/src/engine/Node.hxx @@ -96,6 +96,7 @@ namespace YACS int _numId; std::string _implementation; std::map _propertyMap; + Node *_eventReceiver=nullptr; protected: Node(const std::string& name); Node(const Node& other, ComposedNode *father); diff --git a/src/engine/OptimizerLoop.hxx b/src/engine/OptimizerLoop.hxx index e909f42b7..5cad3c4f9 100644 --- a/src/engine/OptimizerLoop.hxx +++ b/src/engine/OptimizerLoop.hxx @@ -105,7 +105,6 @@ namespace YACS OutPort *getOutPort(const std::string& name) const throw(Exception); OutputPort *getOutputPort(const std::string& name) const throw(Exception); YACS::Event finalize(); - protected: virtual YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst); void buildDelegateOf(InPort * & port, OutPort *initialStart, const std::list& pointsOfView); diff --git a/src/engine/Test/RuntimeForEngineTest.cxx b/src/engine/Test/RuntimeForEngineTest.cxx index efdd7f865..d97035268 100644 --- a/src/engine/Test/RuntimeForEngineTest.cxx +++ b/src/engine/Test/RuntimeForEngineTest.cxx @@ -41,6 +41,13 @@ TestElemInputPort::TestElemInputPort(const TestElemInputPort& other, Node *newHe { } +void TestElemInputPort::releaseData() +{ + stringstream msg; + msg << "Not implemented (" << __FILE__ << ":" << __LINE__ << ")"; + throw Exception(msg.str()); +} + void TestElemInputPort::put(const void *data) throw(ConversionException) { cerr << _name << endl; diff --git a/src/engine/Test/RuntimeForEngineTest.hxx b/src/engine/Test/RuntimeForEngineTest.hxx index ba0693ee2..e8c3a8912 100644 --- a/src/engine/Test/RuntimeForEngineTest.hxx +++ b/src/engine/Test/RuntimeForEngineTest.hxx @@ -44,6 +44,7 @@ namespace YACS public: TestElemInputPort(const std::string& name, Node *node, TypeCode* type); TestElemInputPort(const TestElemInputPort& other, Node *newHelder); + void releaseData() override; void put(const void *data) throw(ConversionException); InputPort *clone(Node *newHelder) const; void *get() const throw(Exception); diff --git a/src/engine/Test/ToyNode.cxx b/src/engine/Test/ToyNode.cxx index c971f86b2..f489ed901 100644 --- a/src/engine/Test/ToyNode.cxx +++ b/src/engine/Test/ToyNode.cxx @@ -95,10 +95,16 @@ InputPort *InputToyPort::clone(Node *newHelder) const return new InputToyPort(*this,newHelder); } -void InputToyPort::put(Any *data) +void InputToyPort::releaseData() { if(_data) _data->decrRef(); + _data=nullptr; +} + +void InputToyPort::put(Any *data) +{ + InputToyPort::releaseData(); _data=data; _data->incrRef(); } @@ -599,10 +605,16 @@ void InputLimitPort::edRemoveManInit() InputPort::edRemoveManInit(); } -void InputLimitPort::put(Any *data) +void InputLimitPort::releaseData() { if(_data) _data->decrRef(); + _data=nullptr; +} + +void InputLimitPort::put(Any *data) +{ + InputLimitPort::releaseData(); _data=data; _data->incrRef(); } diff --git a/src/engine/Test/ToyNode.hxx b/src/engine/Test/ToyNode.hxx index 820be259f..4c4632954 100644 --- a/src/engine/Test/ToyNode.hxx +++ b/src/engine/Test/ToyNode.hxx @@ -35,6 +35,7 @@ namespace YACS public: InputToyPort(const InputToyPort& other, Node *newHelder); InputToyPort(const std::string& name, Node *node); + void releaseData() override; void put(const void *data) throw(ConversionException); InputPort *clone(Node *newHelder) const; bool edIsManuallyInitialized() const; @@ -201,6 +202,7 @@ namespace YACS bool edIsManuallyInitialized() const; void *get() const throw(Exception); void edRemoveManInit(); + void releaseData() override; void put(Any *data); ~InputLimitPort(); Any *getAny() { return _data; } diff --git a/src/runtime/CORBAPorts.cxx b/src/runtime/CORBAPorts.cxx index 989fac82e..6cd9bcbdc 100644 --- a/src/runtime/CORBAPorts.cxx +++ b/src/runtime/CORBAPorts.cxx @@ -161,6 +161,10 @@ void display(CORBA::Any* data) } } +void InputCorbaPort::releaseData() +{//do nothing - to be implemented +} + void InputCorbaPort::put(CORBA::Any *data) throw (ConversionException) { #ifdef REFCNT diff --git a/src/runtime/CORBAPorts.hxx b/src/runtime/CORBAPorts.hxx index d36f089f8..ddf297605 100644 --- a/src/runtime/CORBAPorts.hxx +++ b/src/runtime/CORBAPorts.hxx @@ -51,6 +51,7 @@ namespace YACS void edRemoveManInit(); virtual void put(const void *data) throw(ConversionException); void put(CORBA::Any *data) throw (ConversionException); + void releaseData() override; InputPort *clone(Node *newHelder) const; void *get() const throw(Exception); virtual bool isEmpty(); diff --git a/src/runtime/CppPorts.cxx b/src/runtime/CppPorts.cxx index b464449d1..a46f86765 100644 --- a/src/runtime/CppPorts.cxx +++ b/src/runtime/CppPorts.cxx @@ -67,10 +67,16 @@ void InputCppPort::put(const void *data) throw(ConversionException) put((YACS::ENGINE::Any *)data); } -void InputCppPort::put(YACS::ENGINE::Any *data) throw(ConversionException) +void InputCppPort::releaseData() { if(_data) _data->decrRef(); + _data=nullptr; +} + +void InputCppPort::put(YACS::ENGINE::Any *data) throw(ConversionException) +{ + releaseData(); _data=data; _data->incrRef(); DEBTRACE("value ref count: " << _data->getRefCnt()); diff --git a/src/runtime/CppPorts.hxx b/src/runtime/CppPorts.hxx index 262152aa7..28fb44e6d 100644 --- a/src/runtime/CppPorts.hxx +++ b/src/runtime/CppPorts.hxx @@ -46,6 +46,7 @@ namespace YACS void edRemoveManInit(); virtual void put(const void *data) throw(ConversionException); void put(YACS::ENGINE::Any *data) throw(ConversionException); + void releaseData() override; InputPort *clone(Node *newHelder) const; virtual YACS::ENGINE::Any * getCppObj() const; void *get() const throw(Exception); diff --git a/src/runtime/PythonNode.cxx b/src/runtime/PythonNode.cxx index c09c5a115..0227c2adf 100644 --- a/src/runtime/PythonNode.cxx +++ b/src/runtime/PythonNode.cxx @@ -304,7 +304,7 @@ void PythonEntry::commonRemoteLoad(InlineNode *reqNode) commonRemoteLoadPart3(reqNode,objContainer,isInitializeRequested); } -PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father) +PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze) { _implementation=IMPL_NAME; { @@ -542,6 +542,8 @@ void PythonNode::executeRemote() _errorDetails=ex.what(); throw; } + if(_autoSqueeze) + squeezeMemoryRemote(); } // if(!CORBA::is_nil(_pynode)) @@ -654,12 +656,54 @@ void PythonNode::executeLocal() _errorDetails=ex.what(); throw; } - + if(_autoSqueeze) + squeezeMemory(); DEBTRACE( "-----------------End PyNode::outputs-----------------" ); } DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" ); } +void PythonNode::squeezeMemorySafe() +{ + AutoGIL agil; + if(_mode==PythonNode::REMOTE_NAME) + this->squeezeMemoryRemote(); + else + this->squeezeMemory(); +} + +void PythonNode::squeezeMemory() +{ + for(auto p : _setOfInputPort) + { + PyDict_DelItemString(_context,p->getName().c_str()); + InputPyPort *p2(static_cast(p)); + if(p2->canSafelySqueezeMemory()) + p2->put(Py_None); + } + for(auto p : _setOfOutputPort) + { + PyDict_DelItemString(_context,p->getName().c_str()); + OutputPyPort *p2(static_cast(p)); + p2->putWithoutForward(Py_None); + } +} + +void PythonNode::squeezeMemoryRemote() +{ + for(auto p : _setOfInputPort) + { + InputPyPort *p2(static_cast(p)); + if(p2->canSafelySqueezeMemory()) + p2->put(Py_None); + } + for(auto p : _setOfOutputPort) + { + OutputPyPort *p2(static_cast(p)); + p2->putWithoutForward(Py_None); + } +} + std::string PythonNode::getContainerLog() { return PythonEntry::GetContainerLog(_mode,_container,this); diff --git a/src/runtime/PythonNode.hxx b/src/runtime/PythonNode.hxx index fdac25950..8e05906df 100644 --- a/src/runtime/PythonNode.hxx +++ b/src/runtime/PythonNode.hxx @@ -85,6 +85,12 @@ namespace YACS PythonNode* cloneNode(const std::string& name); virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; } void applyDPLScope(ComposedNode *gfn); + void setSqueezeStatus(bool sqStatus) { _autoSqueeze=sqStatus; } + bool getSqueezeStatus() const { return _autoSqueeze; } + void squeezeMemorySafe(); + protected: + void squeezeMemory(); + void squeezeMemoryRemote(); public: static const char KIND[]; static const char IMPL_NAME[]; @@ -92,6 +98,7 @@ namespace YACS static const char REMOTE_NAME[]; static const char DPL_INFO_NAME[]; protected: + bool _autoSqueeze = false; Engines::PyScriptNode_var _pynode; }; diff --git a/src/runtime/PythonPorts.cxx b/src/runtime/PythonPorts.cxx index 2da1567ae..dfe676e1f 100644 --- a/src/runtime/PythonPorts.cxx +++ b/src/runtime/PythonPorts.cxx @@ -34,6 +34,8 @@ using namespace std; void releasePyObj(PyObject* data) { + if(!data) + return ; DEBTRACE( "data refcnt: " << data->ob_refcnt ); if (PyObject_HasAttrString(data, (char*)"_is_a")) { @@ -159,11 +161,23 @@ void InputPyPort::put(const void *data) throw(ConversionException) put((PyObject *)data); } -void InputPyPort::put(PyObject *data) throw(ConversionException) +void InputPyPort::releaseDataUnsafe() { - InterpreterUnlocker l; releasePyObj(_data); Py_XDECREF(_data); + _data = nullptr; +} + +void InputPyPort::releaseData() +{ + InterpreterUnlocker l; + InputPyPort::releaseDataUnsafe(); +} + +void InputPyPort::put(PyObject *data) throw(ConversionException) +{ + InterpreterUnlocker l; + InputPyPort::releaseDataUnsafe(); _data = data; _stringRef=""; Py_INCREF(_data); @@ -315,9 +329,8 @@ void OutputPyPort::put(const void *data) throw(ConversionException) put((PyObject *)data); } -void OutputPyPort::put(PyObject *data) throw(ConversionException) +void OutputPyPort::putWithoutForward(PyObject *data) throw(ConversionException) { - InputPort *p; DEBTRACE( "OutputPyPort::put.ob refcnt: " << data->ob_refcnt ); #ifdef _DEVDEBUG_ PyObject_Print(data,stderr,Py_PRINT_RAW); @@ -328,6 +341,11 @@ void OutputPyPort::put(PyObject *data) throw(ConversionException) _data = data; Py_INCREF(_data); //no registerPyObj : we steal the output reference of the node +} + +void OutputPyPort::put(PyObject *data) throw(ConversionException) +{ + putWithoutForward(data); DEBTRACE( "OutputPyPort::put.ob refcnt: " << data->ob_refcnt ); OutputPort::put(data); } diff --git a/src/runtime/PythonPorts.hxx b/src/runtime/PythonPorts.hxx index b391e0028..9945a1c43 100644 --- a/src/runtime/PythonPorts.hxx +++ b/src/runtime/PythonPorts.hxx @@ -79,6 +79,7 @@ namespace YACS bool edIsManuallyInitialized() const; void edRemoveManInit(); virtual void put(const void *data) throw(ConversionException); + void releaseData() override; void put(PyObject *data) throw(ConversionException); InputPort *clone(Node *newHelder) const; //special typedef PyObj used in SWIG to increment ref count on output @@ -93,7 +94,8 @@ namespace YACS virtual std::string typeName() {return "YACS__ENGINE__InputPyPort";} virtual std::string valToStr(); virtual void valFromStr(std::string valstr); - + protected: + void releaseDataUnsafe(); protected: PyObject* _data; PyObject* _initData; @@ -107,6 +109,7 @@ namespace YACS OutputPyPort(const OutputPyPort& other, Node *newHelder); ~OutputPyPort(); virtual void put(const void *data) throw(ConversionException); + void putWithoutForward(PyObject *data) throw(ConversionException); void put(PyObject *data) throw(ConversionException); OutputPort *clone(Node *newHelder) const; virtual PyObject * get() const; diff --git a/src/runtime/XMLPorts.cxx b/src/runtime/XMLPorts.cxx index 8e8bba3b3..606f07d61 100644 --- a/src/runtime/XMLPorts.cxx +++ b/src/runtime/XMLPorts.cxx @@ -70,6 +70,10 @@ void InputXmlPort::put(const char *data) throw (ConversionException) _data = data; } +void InputXmlPort::releaseData() +{//nothing because _data has no ref counter +} + bool InputXmlPort::isEmpty() { return _data.empty(); diff --git a/src/runtime/XMLPorts.hxx b/src/runtime/XMLPorts.hxx index 66ed9c877..61836d3c9 100644 --- a/src/runtime/XMLPorts.hxx +++ b/src/runtime/XMLPorts.hxx @@ -43,6 +43,7 @@ namespace YACS void edRemoveManInit(); virtual void put(const void *data) throw (ConversionException); void put(const char *data) throw (ConversionException); + void releaseData() override; InputPort *clone(Node *newHelder) const; virtual const char * getXml() const; void *get() const throw(Exception); -- 2.30.2