Salome HOME
Squeeze memory of process hosting YACS graph
authorAnthony Geay <anthony.geay@edf.fr>
Fri, 25 Jan 2019 09:00:54 +0000 (10:00 +0100)
committerAnthony Geay <anthony.geay@edf.fr>
Mon, 29 Jul 2019 11:07:44 +0000 (13:07 +0200)
38 files changed:
src/engine/AnyInputPort.cxx
src/engine/AnyInputPort.hxx
src/engine/ComposedNode.cxx
src/engine/ComposedNode.hxx
src/engine/ConditionInputPort.cxx
src/engine/ConditionInputPort.hxx
src/engine/Dispatcher.cxx
src/engine/Dispatcher.hxx
src/engine/DynParaLoop.cxx
src/engine/DynParaLoop.hxx
src/engine/ElementaryNode.cxx
src/engine/ForEachLoop.cxx
src/engine/ForEachLoop.hxx
src/engine/InPort.cxx
src/engine/InPort.hxx
src/engine/InPropertyPort.cxx
src/engine/InPropertyPort.hxx
src/engine/InputPort.cxx
src/engine/InputPort.hxx
src/engine/Loop.cxx
src/engine/Loop.hxx
src/engine/Node.cxx
src/engine/Node.hxx
src/engine/OptimizerLoop.hxx
src/engine/Test/RuntimeForEngineTest.cxx
src/engine/Test/RuntimeForEngineTest.hxx
src/engine/Test/ToyNode.cxx
src/engine/Test/ToyNode.hxx
src/runtime/CORBAPorts.cxx
src/runtime/CORBAPorts.hxx
src/runtime/CppPorts.cxx
src/runtime/CppPorts.hxx
src/runtime/PythonNode.cxx
src/runtime/PythonNode.hxx
src/runtime/PythonPorts.cxx
src/runtime/PythonPorts.hxx
src/runtime/XMLPorts.cxx
src/runtime/XMLPorts.hxx

index 2c914bbe1cf2d8648e798c7f0942f33cbfd913e1..6fa9acba5c4421045ae954807b21f557a87043dc 100644 (file)
@@ -84,16 +84,29 @@ void AnyInputPort::exRestoreInit()
   _value->incrRef();
 }
 
-void AnyInputPort::put(Any *data)
+void AnyInputPort::releaseDataUnsafe()
 {
-  YACS::BASES::AutoLocker<YACS::BASES::Mutex> lock(&_mutex);
   if(_value)
     _value->decrRef();
+  _value = nullptr;
+}
+
+void AnyInputPort::releaseData()
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> lock(&_mutex);
+  releaseDataUnsafe();
+}
+
+void AnyInputPort::put(Any *data)
+{
+  YACS::BASES::AutoLocker<YACS::BASES::Mutex> lock(&_mutex);
+  releaseDataUnsafe();
   _value=data;
-  if (_value) {
-    _value->incrRef();
-    DEBTRACE("value ref count: " << _value->getRefCnt());
-  }
+  if (_value)
+    {
+      _value->incrRef();
+      DEBTRACE("value ref count: " << _value->getRefCnt());
+    }
 }
 
 bool AnyInputPort::isEmpty()
index c276a8131efe388200764637814bcc1b336aa6ad..1791e3e8b652cac87b43e5ff0034306ba964b5b9 100644 (file)
@@ -40,6 +40,7 @@ namespace YACS
       void exRestoreInit();
       Any *getValue() const { return _value; }
       int getIntValue() const { return _value ? _value->getIntValue():0; }
+      void releaseData() override;
       void put(Any *data);
       void *get() const;
       virtual std::string getAsString();
@@ -48,6 +49,8 @@ namespace YACS
       InputPort *clone(Node *newHelder) const;
       std::string dump();
       virtual std::string typeName() {return "YACS__ENGINE__AnyInputPort";}
+    protected:
+      void releaseDataUnsafe();
     protected:
       Any *_value;
     private:
index cc9f49fb138e6a55603fb0656d9f594bf8563788..ccb4f7917c93e4bee9fdf8d7c05e69aab0fb8bee 100644 (file)
@@ -307,13 +307,17 @@ bool ComposedNode::edAddLink(OutPort *start, InPort *end) throw(YACS::Exception)
     iterS=end->getNode()->_father;
 
   InPort *currentPortI=end;
+  bool isLoopProofLink(true),isFirstTurn(true);
   while(iterS!=lwstCmnAnctr)
     {
+      if(!isFirstTurn && iterS->isLoop())// isFirstTurn is a way to filter special inputs like nbBranches, splitPort... These special inputs are loopProof -> they must not be realeased by ForEachLoop::exUpdateState
+        isLoopProofLink=false;
       iterS->buildDelegateOf(currentPortI, start, allAscendanceOfNodeStart);
       iterS=iterS->_father;
+      isFirstTurn=false;
     }
   bool ret=(pO.first)->addInPort(currentPortI);
-  end->edNotifyReferencedBy(pO.second);
+  end->edNotifyReferencedBy(pO.second,isLoopProofLink);
   return ret;
 }
 
index 0d6e78314cc14e08443b69d5cd72e9d662354d9f..68f5456946adaf16f3d14978fad8a1abac2f8f0a 100644 (file)
@@ -76,6 +76,7 @@ namespace YACS
       void edRemoveLink(OutPort *start, InPort *end) throw(Exception);
       void edRemoveLink(OutGate *start, InGate *end) throw(Exception);
       virtual bool isRepeatedUnpredictablySeveralTimes() const { return false; }
+      virtual bool isLoop() const { return false; }
       virtual std::list<Node *> edGetDirectDescendants() const =  0;
       virtual void removeRecursivelyRedundantCL();
       std::list<ElementaryNode *> getRecursiveConstituents() const;
index 48ee6320326a9eed0047a031408c54979aa14b58..d5dc9b35b99baa73f3b669a4bcde67192a4d31fd 100644 (file)
@@ -73,7 +73,7 @@ bool ConditionInputPort::isLinkedOutOfScope() const
   return _outOfScopeBackLink!=0;
 }
 
-void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort)
+void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof)
 {
   if(!((ComposedNode*)(_node))->isInMyDescendance(fromPort->getNode()))
     {
@@ -81,7 +81,7 @@ void ConditionInputPort::edNotifyReferencedBy(OutPort *fromPort)
         throw Exception("ConditionInputPort::edNotifyReferenced : already linked from outside");
       _outOfScopeBackLink=fromPort;
     }
-  InputPort::edNotifyReferencedBy(fromPort);
+  InputPort::edNotifyReferencedBy(fromPort,isLoopProof);
 }
 
 void ConditionInputPort::edNotifyDereferencedBy(OutPort *fromPort)
@@ -104,10 +104,16 @@ void ConditionInputPort::put(const void *data) throw(ConversionException)
   put((Any*)data);
 }
 
-void ConditionInputPort::put(Any *data) throw(ConversionException)
+void ConditionInputPort::releaseData()
 {
   if(_value)
     _value->decrRef();
+  _value=nullptr;
+}
+
+void ConditionInputPort::put(Any *data) throw(ConversionException)
+{
+  ConditionInputPort::releaseData();
   _value=data;
   _value->incrRef();
 }    
index c1467b3e384c0019f22ebb20c2639c7d8e16f0e7..d96918c06a0371c459cd8443318efec89fa314cd 100644 (file)
@@ -42,11 +42,12 @@ namespace YACS
       void exSaveInit();
       void exRestoreInit();
       bool isLinkedOutOfScope() const;
-      void edNotifyReferencedBy(OutPort *fromPort);
+      void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof) override;
       void edNotifyDereferencedBy(OutPort *fromPort);
       void *get() const;
       virtual void put(const void *data) throw(ConversionException);
       void put(Any *data) throw(ConversionException);
+      void releaseData() override;
       std::string dump();
       virtual std::string getAsString();
     protected:
index 95afd669d666ab24965db48ba14f85ebc02ed859..03c0276a99dd495a510ed172c11ed714f9318903 100644 (file)
@@ -40,6 +40,11 @@ void Observer::notifyObserver2(Node* object,const std::string& event, void *some
   DEBTRACE("notifyObserver2 " << event << object );
 }
 
+void Observer::notifyObserverFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent)
+{
+  DEBTRACE("notifyObserverFromClone " << event << originalInstance );
+}
+
 Dispatcher* Dispatcher::_singleton = 0;
 
 Dispatcher::~Dispatcher()
@@ -104,6 +109,19 @@ void Dispatcher::dispatch2(Node* object,const std::string& event, void *somethin
     }
 }
 
+void Dispatcher::dispatchFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent)
+{
+  std::pair<Node*,std::string> key(originalInstance,event);
+  std::map< std::pair<Node*,std::string> , std::set<Observer*> >::const_iterator it(_observers.find(key));
+  if(it!=_observers.end())
+    {
+      for(std::set<Observer*>::const_iterator iter=(*it).second.begin();iter!=(*it).second.end();iter++)
+        {
+          (*iter)->notifyObserverFromClone(originalInstance,event,clonedInstanceGeneratingEvent);
+        }
+    }
+}
+
 void Dispatcher::addObserver(Observer* observer,Node* object, const std::string& event)
 {
   _observers[std::pair<Node*,std::string>(object,event)].insert(observer);
index 0ae04705e800519b91fc667438a3f262f57e08e2..fcd4ce4526f69650b5ffb03a64121ea794925c14 100644 (file)
@@ -47,6 +47,7 @@ namespace YACS
     public:
       virtual void notifyObserver(Node* object,const std::string& event);
       virtual void notifyObserver2(Node* object,const std::string& event, void *something);
+      virtual void notifyObserverFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent);
       virtual ~Observer();
     };
 
@@ -74,6 +75,7 @@ namespace YACS
     public:
       virtual void dispatch(Node* object,const std::string& event);
       virtual void dispatch2(Node* object,const std::string& event, void *something);
+      virtual void dispatchFromClone(Node *originalInstance, const std::string& event, Node *clonedInstanceGeneratingEvent);
       virtual void addObserver(Observer* observer,Node* object,const std::string& event);
       virtual void removeObserver(Observer* observer,Node* object,const std::string& event);
       virtual void printObservers();
index a0fa8b4535e25972ccc7c07cb991ded7765fa1bb..2a84e19beaf5375d62b0059492cf2a01cc225c19 100644 (file)
@@ -372,8 +372,9 @@ void DynParaLoop::prepareInputsFromOutOfScope(int branchNb)
       if(portToSet)//portToSet==0 in case of portToSet==_splitterNode._dataPortToDispatch of ForEach
         {
           portToSet->put((const void *)val);
-          portToSet->edNotifyReferencedBy(0);//This is to indicate that somewhere somebody deals with this inputport
+          portToSet->edNotifyReferencedBy(nullptr,false);//This is to indicate that somewhere somebody deals with this inputport
           //even if no direct physical link exists. This exclusively for _execNodes[branchNb]::init on the next turn of loop.
+          //false is put as 2nd parameter to tell to portToSet, do not touch to the data in case of squeezeMemory.
         }
     }
 }
index e491b5b0e860b0a2fd3fb51d1e5efcf88c0dc999..d2849265df04e1aa6039de48cec1e1afb7f4f135 100644 (file)
@@ -91,6 +91,7 @@ namespace YACS
       //! For the moment false is returned : impovement about it coming soon.
       bool isPlacementPredictableB4Run() const;
       void edRemoveChild(Node *node) throw(Exception);
+      bool isLoop() const override { return true; }
       virtual bool edAddChild(Node *DISOWNnode) throw(Exception);
       std::list<Node *> edGetDirectDescendants() const;
       std::list<InputPort *> getSetOfInputPort() const;
@@ -112,6 +113,7 @@ namespace YACS
       Node * getFinalizeNode();
       int getMaxLevelOfParallelism() const;
       void partitionRegardingDPL(const PartDefinition *pd, std::map<ComposedNode *, YACS::BASES::AutoRefCnt<PartDefinition> >& zeMap);
+      virtual void cleanDynGraph();
     protected:
       void buildDelegateOf(InPort * & port, OutPort *initialStart, const std::list<ComposedNode *>& pointsOfView);
       void buildDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView);
@@ -124,7 +126,6 @@ namespace YACS
       virtual void checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
                                        InPort *end, const std::list<ComposedNode *>& pointsOfViewEnd) throw(Exception);
     protected:
-      void cleanDynGraph();
       void prepareInputsFromOutOfScope(int branchNb);
       void putValueOnBranch(Any *val, unsigned branchId, bool first);
       TypeOfNode getIdentityOfNotifyerNode(const Node *node, unsigned& id);
index 1aef1c19674269a51a85a978a9119b0159ac93a1..d227016b9bfb3542e7209a44ed99752a91644b43 100644 (file)
@@ -435,10 +435,10 @@ void ElementaryNode::getReadyTasks(std::vector<Task *>& tasks)
           std::string input_port_name = (*iter1)->getName();
           // Get Port Name in master node
           InputPort * master_port = getInputPort(input_port_name);
-          for (std::set<OutPort *>::const_iterator itt=master_port->_backLinks.begin(); itt!=master_port->_backLinks.end();itt++)
+          for (auto itt : master_port->_backLinks)
           {
             // Connect dataflow
-            getProc()->edAddDFLink((*itt),(*iter1));
+            getProc()->edAddDFLink(itt.first,(*iter1));
           }
         }
 
index b3d87fdfa6d7ab368b5346968d622e9fd22eb995..33faee131cdf0e5c8d1b13eaff2b9a680c739f7b 100644 (file)
@@ -604,6 +604,14 @@ void ForEachLoop::exUpdateState()
               _passedData->checkLevel2(_execOutGoingPorts[0]);
               _passedData->assignAlreadyDone(_execVals);
             }
+          // clean inputs data coming from the outside in _node
+          set< InPort * > portsToSetVals=getAllInPortsComingFromOutsideOfCurrentScope();
+          for(auto iter : portsToSetVals)
+            {
+              InputPort *curPortCasted=(InputPort *) iter;//Cast granted by ForEachLoop::buildDelegateOf(InPort)
+              if(!curPortCasted->canSafelySqueezeMemory())// this can appear strange ! if not safelySqueeze -> release. Nevertheless it is true.
+                curPortCasted->releaseData();             // these input ports have been incremented with InputPort::put into DynParaLoop::prepareInputsFromOutOfScope. So they can be released now.
+            }
         }
       catch(YACS::Exception& ex)
         {
@@ -853,14 +861,15 @@ YACS::Event ForEachLoop::updateStateForWorkNodeOnFinishedEventFrom(Node *node, u
   else if(_state == YACS::ACTIVATED)
     {//more elements to do and loop still activated
       _execIds[id]=_execCurrentId;
-      node->init(false);
       int posInAbs(_execCurrentId);
       if(_passedData)
         posInAbs=_passedData->toAbsId(_execCurrentId);
       _splitterNode.putSplittedValueOnRankTo(posInAbs,id,false);
+      //forwardExecStateToOriginalBody(node);
+      node->init(false);
       _execCurrentId++;
       node->exUpdateState();
-      forwardExecStateToOriginalBody(node);
+      //forwardExecStateToOriginalBody(node);
       _nbOfEltConsumed++;
     }
   else
@@ -1119,7 +1128,6 @@ void ForEachLoop::resetState(int level)
   if(level==0)return;
   DynParaLoop::resetState(level);
   _execCurrentId=0;
-  //Note: cleanDynGraph is not a virtual method (must be called from ForEachLoop object) 
   cleanDynGraph();
 }
 
index c31f3f9ad777fc718c80827498d15b9ce5974d87..f897980057b94c276f1abb9cbc2ffb0a686e4a05 100644 (file)
@@ -208,6 +208,7 @@ namespace YACS
       void assignPassedResults(const std::vector<unsigned int>& passedIds, const std::vector<SequenceAny *>& passedOutputs, const std::vector<std::string>& nameOfOutputs);
 #endif
      const TypeCode* getOutputPortType(const std::string& portName)const;
+      void cleanDynGraph() override;
     protected:
       Node *simpleClone(ComposedNode *father, bool editionOnly=true) const;
       void checkLinkPossibility(OutPort *start, const std::list<ComposedNode *>& pointsOfViewStart,
@@ -221,7 +222,6 @@ namespace YACS
       void getDelegateOf(std::pair<OutPort *, OutPort *>& port, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(Exception);
       void releaseDelegateOf(OutPort *portDwn, OutPort *portUp, InPort *finalTarget, const std::list<ComposedNode *>& pointsOfView) throw(Exception);
     protected:
-      void cleanDynGraph();
       void pushAllSequenceValues();
       void createOutputOutOfScopeInterceptors(int branchNb);
       void prepareSequenceValues(int sizeOfSamples);
index 4cdea5bb3122f98aa39dad6e2fb25cbb963a32c1..fd39c76d873041aab38dd76ea4818ec6734fd937 100644 (file)
@@ -20,7 +20,9 @@
 #include "InPort.hxx"
 #include "OutPort.hxx"
 #include "ComposedNode.hxx"
+
 #include <iostream>
+#include <algorithm>
 
 using namespace YACS::ENGINE;
 using namespace std;
@@ -47,13 +49,13 @@ int InPort::edGetNumberOfLinks() const
 
 void InPort::edRemoveAllLinksLinkedWithMe() throw(YACS::Exception)
 {
-  set<OutPort *> temp(_backLinks);//edRemoveLink called after causes invalidation of set iterator.
-  for(set<OutPort *>::iterator iter=temp.begin();iter!=temp.end();iter++)
+  set< std::pair<OutPort *,bool> > temp(_backLinks);//edRemoveLink called after causes invalidation of set iterator.
+  for(auto iter : temp)
     {
       set<OutPort *> trueBackOutputs;
-      (*iter)->getAllRepresented(trueBackOutputs);
-      for(set<OutPort *>::iterator iter2=trueBackOutputs.begin();iter2!=trueBackOutputs.end();iter2++)
-        _node->getRootNode()->edRemoveLink(*iter2,this);
+      iter.first->getAllRepresented(trueBackOutputs);
+      for( auto iter2 : trueBackOutputs )
+        _node->getRootNode()->edRemoveLink(iter2,this);
     }
   _backLinks.clear();
   modified();
@@ -62,18 +64,46 @@ void InPort::edRemoveAllLinksLinkedWithMe() throw(YACS::Exception)
 //! Returns \b physical backlinks \b NOT user backlinks.
 std::set<OutPort *> InPort::edSetOutPort() const
 {
-  return _backLinks;
+  std::set<OutPort *> ret;
+  for( auto iter : _backLinks )
+    ret.insert(iter.first);
+  return ret;
+}
+
+bool InPort::canSafelySqueezeMemory() const
+{
+  if(!isBackLinked())
+    return false;
+  for(auto bl : _backLinks)
+    {
+      if(!bl.second)
+        return false;
+    }
+  return true;
 }
 
-void InPort::edNotifyReferencedBy(OutPort *fromPort)
+/*!
+ * \param [in] isLoopProof - Tells if the data coming from \a fromPort will be send again in case of \a this is initialized. This value is
+ *                           important if \a this is an InPort of a Node contained directly or not inside a Loop, ForEachLoop, OptimizerLoop.
+ *                           In this case, to optimize memory consumption (see squeezeMemory method), we need to know if data coming from \a fromPort
+ *                           will be generated again in case.
+ *                           If true (the default) it means that for that link is a link loop proof so no need to take care. If false, the link is not loop proof so
+ *                           event in the context of agressive memory management the data can't be safely released.
+ */
+void InPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof)
 {
-  _backLinks.insert(fromPort);
+  auto it(std::find_if(_backLinks.begin(),_backLinks.end(),[fromPort](const std::pair<OutPort *,bool>& p){ return p.first==fromPort; }));
+  if(it!=_backLinks.end())
+    _backLinks.erase(it);
+  _backLinks.insert(std::pair<OutPort *,bool>(fromPort,isLoopProof));
   modified();
 }
 
 void InPort::edNotifyDereferencedBy(OutPort *fromPort)
 {
-  _backLinks.erase(fromPort);
+  auto it(std::find_if(_backLinks.begin(),_backLinks.end(),[fromPort](const std::pair<OutPort *,bool>& p){ return p.first==fromPort; }));
+  if(it!=_backLinks.end())
+    _backLinks.erase(it);
   modified();
 }
 
index 4778aa979ce24f29df7e8c7d0e6b94984cd33ede..bb8a221d7a0b3ee2fb9019452f4ca61597da5edb 100644 (file)
@@ -62,17 +62,19 @@ namespace YACS
       virtual InPort *getPublicRepresentant() { return this; }
       virtual int edGetNumberOfLinks() const;
       virtual std::set<OutPort *> edSetOutPort() const;
+      bool canSafelySqueezeMemory() const;
+      bool isBackLinked() const { return !_backLinks.empty(); }
       virtual ~InPort();
       virtual std::string typeName() {return "YACS__ENGINE__InPort";}
     protected:
       InPort(const InPort& other, Node *newHelder);
       InPort(const std::string& name, Node *node, TypeCode* type);
       void edRemoveAllLinksLinkedWithMe() throw(Exception);
-      virtual void edNotifyReferencedBy(OutPort *fromPort);
+      virtual void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof = true);
       virtual void edNotifyDereferencedBy(OutPort *fromPort);
       virtual void getAllRepresentants(std::set<InPort *>& repr) const;
     protected:
-      std::set<OutPort *> _backLinks;
+      std::set< std::pair<OutPort *,bool> > _backLinks;
     };
   }
 }
index b5a8bfb692df0ab5ccb4f9f7694527f73eecf608..3d40054710c04bedfd6806853c3e5270a9780884 100644 (file)
@@ -89,6 +89,13 @@ InPropertyPort::put(const void *data) throw(ConversionException)
   put((YACS::ENGINE::Any *)data);
 }
 
+void InPropertyPort::releaseData()
+{
+  if(_property_data)
+    _property_data->decrRef();
+  _property_data = nullptr;
+}
+
 void
 InPropertyPort::put(YACS::ENGINE::Any *data)
 {
@@ -100,9 +107,7 @@ InPropertyPort::put(YACS::ENGINE::Any *data)
     std::string value = ((*seq_data)[i]["value"])->getStringValue();
     exNewPropertyValue(key, value);
   }
-
-  if(_property_data)
-    _property_data->decrRef();
+  InPropertyPort::releaseData();
   _property_data = data;
   _property_data->incrRef();
   DEBTRACE("value ref count: " << _property_data->getRefCnt());
index cd0677ca6e50057fd704a595ef3e007895f1c099..c9cff5b45501dbc95627129b9a63a797f14f521c 100644 (file)
@@ -61,7 +61,7 @@ namespace YACS
       virtual void *get() const;
       virtual void put(const void *data) throw(ConversionException);
       virtual void put(YACS::ENGINE::Any *data);
-
+      void releaseData() override;
     protected:
       InPropertyPort(const InPropertyPort& other, Node *newHelder);
       InPropertyPort(const std::string& name, Node *node, TypeCode* type, bool canBeNull = false);
index eaef5d34f0ccce9f74c1535240bd95cd841ad977..a0b141397cd7833b95d24b5ecf9eb9ce0f582181 100644 (file)
@@ -206,9 +206,9 @@ InputPort *ProxyPort::clone(Node *newHelder) const
   throw Exception("ProxyPort::clone : internal error - should never happened");
 }
 
-void ProxyPort::edNotifyReferencedBy(OutPort *fromPort)
+void ProxyPort::edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof)
 {
-  _port->edNotifyReferencedBy(fromPort);
+  _port->edNotifyReferencedBy(fromPort,isLoopProof);
 }
 
 void ProxyPort::edNotifyDereferencedBy(OutPort *fromPort)
@@ -250,6 +250,11 @@ void *ProxyPort::get() const
   return _port->get();
 }
 
+void ProxyPort::releaseData()
+{
+  _port->releaseData();
+}
+
 void ProxyPort::put(const void *data) throw(ConversionException)
 {
   _port->put(data);
index 47326878c856bbea07e7b6dbe466a09991148036..91b368fe5e2f67a5ef97a7165173d154e7f86983 100644 (file)
@@ -74,6 +74,7 @@ namespace YACS
       virtual bool isEmpty();
 
       virtual void *get() const = 0;
+      virtual void releaseData() = 0;
       virtual void put(const void *data) throw(ConversionException) = 0;
       virtual std::string dump();
       virtual std::string getHumanRepr();
@@ -83,6 +84,7 @@ namespace YACS
     protected:
       InputPort(const InputPort& other, Node *newHelder);
       InputPort(const std::string& name, Node *node, TypeCode* type, bool canBeNull = false);
+      void releaseDataUnsafe();
     protected:
       Any *_initValue;
       std::string _stringRef;
@@ -102,7 +104,7 @@ namespace YACS
       
       void edRemoveAllLinksLinkedWithMe() throw(Exception);
       InputPort *clone(Node *newHelder) const;
-      void edNotifyReferencedBy(OutPort *fromPort);
+      void edNotifyReferencedBy(OutPort *fromPort, bool isLoopProof = true) override;
       void edNotifyDereferencedBy(OutPort *fromPort);
       std::set<OutPort *> edSetOutPort() const;
 #ifdef NOCOVARIANT
@@ -112,6 +114,7 @@ namespace YACS
 #endif
       void *get() const;
       virtual void put(const void *data) throw(ConversionException) ;
+      void releaseData() override;
       int edGetNumberOfLinks() const;
       bool isIntermediate() const { return true; }
       void exRestoreInit();
index 1634e1b49774110b5c9949d53664a3e927e5fd52..7f0a290978f8366a9f7fae4773f05c1179d039be 100644 (file)
@@ -85,10 +85,16 @@ InputPort *InputPort4DF2DS::clone(Node *newHelder) const
   throw Exception("InputPort4DF2DS::clone : internal error");
 }
 
-void InputPort4DF2DS::put(Any *data)
+void InputPort4DF2DS::releaseData()
 {
   if(_data)
     _data->decrRef();
+  _data = nullptr;
+}
+
+void InputPort4DF2DS::put(Any *data)
+{
+  InputPort4DF2DS::releaseData();
   _data=data;
   _data->incrRef();
 }
index b606029f40637fff096157cef87c99c25a5f86d5..4b67377dab30b32de90c354732b3224eab87f522 100644 (file)
@@ -43,6 +43,7 @@ namespace YACS
     public:
       InputPort4DF2DS(DFToDSForLoop *node, TypeCode* type);
       void getAllRepresentants(std::set<InPort *>& repr) const;
+      void releaseData() override;
       void put(const void *data) throw(ConversionException);
       InputPort *clone(Node *newHelder) const;
       void *get() const throw(Exception);
@@ -162,6 +163,7 @@ namespace YACS
       //Node* DISOWNnode is a SWIG notation to indicate that the ownership of the node is transfered to C++
       Node *edSetNode(Node *DISOWNnode);
       virtual bool edAddChild(Node *DISOWNnode) throw(Exception);
+      bool isLoop() const override { return true; }
       Node *edRemoveNode();
       virtual void checkBasicConsistency() const throw(Exception);
       //! Returns the port which value is used to take decision about the continuation of the loop.
index f7d8196adc85498b2e14f0fbef071b91ebf820af..fb7d4e841555ff5880812c87d0304d334bb9712f 100644 (file)
@@ -86,6 +86,7 @@ Node::Node(const Node& other, ComposedNode *father):_inGate(this),_outGate(this)
 
   // Every node has an InPropertyPort
   _inPropertyPort = new InPropertyPort("__InPropertyPort__Node__YACS_", this, Runtime::_tc_propvec);
+  _eventReceiver=const_cast<Node *>(&other);
 }
 
 Node::~Node()
@@ -683,8 +684,15 @@ void Node::applyDPLScope(ComposedNode *gfn)
 void Node::sendEvent(const std::string& event)
 {
   DEBTRACE("Node::sendEvent " << event);
-  Dispatcher* disp=Dispatcher::getDispatcher();
-  disp->dispatch(this,event);
+  Dispatcher *disp(Dispatcher::getDispatcher());
+  if(!_eventReceiver)
+    {
+      disp->dispatch(this,event);
+    }
+  else
+    {
+      disp->dispatchFromClone(_eventReceiver,event,this);
+    }
 }
 
 //! emit notification to all observers registered with  the dispatcher 
index fbd22767001f97d43210899c01f629f32508118f..9ec273ff96e041549f669c23df52b9ff1a9a6038 100644 (file)
@@ -96,6 +96,7 @@ namespace YACS
       int _numId;
       std::string _implementation;
       std::map<std::string,std::string> _propertyMap;
+      Node *_eventReceiver=nullptr;
     protected:
       Node(const std::string& name);
       Node(const Node& other, ComposedNode *father);
index e909f42b77a91fde79743e7a61e7bcd6f71adf21..5cad3c4f9c36f895a20ebec8cc6f5a45646846b9 100644 (file)
@@ -105,7 +105,6 @@ namespace YACS
       OutPort *getOutPort(const std::string& name) const throw(Exception);
       OutputPort *getOutputPort(const std::string& name) const throw(Exception);
       YACS::Event finalize();
-
     protected:
       virtual YACS::Event updateStateOnFailedEventFrom(Node *node, const Executor *execInst);
       void buildDelegateOf(InPort * & port, OutPort *initialStart, const std::list<ComposedNode *>& pointsOfView);
index efdd7f8657150f368bb06e92db94e1935ed3c9cd..d97035268c29e22178293ab2f6c7bdf3bb07f17e 100644 (file)
@@ -41,6 +41,13 @@ TestElemInputPort::TestElemInputPort(const TestElemInputPort& other, Node *newHe
 {
 }
 
+void TestElemInputPort::releaseData()
+{
+  stringstream msg;
+  msg << "Not implemented (" << __FILE__ << ":" << __LINE__ << ")";
+  throw Exception(msg.str());
+}
+
 void TestElemInputPort::put(const void *data) throw(ConversionException)
 {
   cerr << _name << endl;
index ba0693ee20da222fab53de0326c1a45896d2d1a2..e8c3a891290d27ef712a51b681d152d60316a78c 100644 (file)
@@ -44,6 +44,7 @@ namespace YACS
     public:
       TestElemInputPort(const std::string& name, Node *node, TypeCode* type);
       TestElemInputPort(const TestElemInputPort& other, Node *newHelder);
+      void releaseData() override;
       void put(const void *data) throw(ConversionException);
       InputPort *clone(Node *newHelder) const;
       void *get() const throw(Exception);
index c971f86b28d126dc28bd5404e307c5f310bf020f..f489ed901922bcc44b908a1905a22250459ef895 100644 (file)
@@ -95,10 +95,16 @@ InputPort *InputToyPort::clone(Node *newHelder) const
   return new InputToyPort(*this,newHelder);
 }
 
-void InputToyPort::put(Any *data)
+void InputToyPort::releaseData()
 {
   if(_data)
     _data->decrRef();
+  _data=nullptr;
+}
+
+void InputToyPort::put(Any *data)
+{
+  InputToyPort::releaseData();
   _data=data;
   _data->incrRef();
 }
@@ -599,10 +605,16 @@ void InputLimitPort::edRemoveManInit()
   InputPort::edRemoveManInit();
 }
 
-void InputLimitPort::put(Any *data)
+void InputLimitPort::releaseData()
 {
   if(_data)
     _data->decrRef();
+  _data=nullptr;
+}
+
+void InputLimitPort::put(Any *data)
+{
+  InputLimitPort::releaseData();
   _data=data;
   _data->incrRef();
 }
index 820be259f64511081c76258e655b30b1b5f4182e..4c463295475301f6e3b185cbbd10e9d1713e7ee0 100644 (file)
@@ -35,6 +35,7 @@ namespace YACS
     public:
       InputToyPort(const InputToyPort& other, Node *newHelder);
       InputToyPort(const std::string& name, Node *node);
+      void releaseData() override;
       void put(const void *data) throw(ConversionException);
       InputPort *clone(Node *newHelder) const;
       bool edIsManuallyInitialized() const;
@@ -201,6 +202,7 @@ namespace YACS
       bool edIsManuallyInitialized() const;
       void *get() const throw(Exception);
       void edRemoveManInit();
+      void releaseData() override;
       void put(Any *data);
       ~InputLimitPort();
       Any *getAny() { return _data; }
index 989fac82ef0f28247a1bb2ccce12bfc3a4aecfe7..6cd9bcbdc9cbe2b334de2f486a4b9d9b77bf1921 100644 (file)
@@ -161,6 +161,10 @@ void display(CORBA::Any* data)
     }
 }
 
+void InputCorbaPort::releaseData()
+{//do nothing - to be implemented
+}
+
 void InputCorbaPort::put(CORBA::Any *data) throw (ConversionException)
 {
 #ifdef REFCNT
index d36f089f818f78051ac71f2abd608d270b1c4c11..ddf2976058bda63dad0e0a27a4b2779233114e9d 100644 (file)
@@ -51,6 +51,7 @@ namespace YACS
       void edRemoveManInit();
       virtual void put(const void *data) throw(ConversionException);
       void put(CORBA::Any *data) throw (ConversionException);
+      void releaseData() override;
       InputPort *clone(Node *newHelder) const;
       void *get() const throw(Exception);
       virtual bool isEmpty();
index b464449d13cbba4d3700c84feb7409326e63de14..a46f8676539dc2c69a7ffc8fe0e41664d9e8089f 100644 (file)
@@ -67,10 +67,16 @@ void InputCppPort::put(const void *data) throw(ConversionException)
   put((YACS::ENGINE::Any *)data);
 }
 
-void InputCppPort::put(YACS::ENGINE::Any *data) throw(ConversionException)
+void InputCppPort::releaseData()
 {
   if(_data)
     _data->decrRef();
+  _data=nullptr;
+}
+
+void InputCppPort::put(YACS::ENGINE::Any *data) throw(ConversionException)
+{
+  releaseData();
   _data=data;
   _data->incrRef();
   DEBTRACE("value ref count: " << _data->getRefCnt());
index 262152aa75768c21fd96caec64d25f3d83afbbb7..28fb44e6d55e1fc8357c3e43666f99f823ff8e09 100644 (file)
@@ -46,6 +46,7 @@ namespace YACS
       void edRemoveManInit();
       virtual void put(const void *data) throw(ConversionException);
       void put(YACS::ENGINE::Any *data) throw(ConversionException);
+      void releaseData() override;
       InputPort *clone(Node *newHelder) const;
       virtual YACS::ENGINE::Any * getCppObj() const;
       void *get() const throw(Exception);
index c09c5a1159e543a84f97e201876b717c43dae807..0227c2adfc4be76ee830bd924fb2c1f213bf6194 100644 (file)
@@ -304,7 +304,7 @@ void PythonEntry::commonRemoteLoad(InlineNode *reqNode)
   commonRemoteLoadPart3(reqNode,objContainer,isInitializeRequested);
 }
 
-PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father)
+PythonNode::PythonNode(const PythonNode& other, ComposedNode *father):InlineNode(other,father),_autoSqueeze(other._autoSqueeze)
 {
   _implementation=IMPL_NAME;
   {
@@ -542,6 +542,8 @@ void PythonNode::executeRemote()
           _errorDetails=ex.what();
           throw;
       }
+      if(_autoSqueeze)
+        squeezeMemoryRemote();
   }
   //
   if(!CORBA::is_nil(_pynode))
@@ -654,12 +656,54 @@ void PythonNode::executeLocal()
         _errorDetails=ex.what();
         throw;
     }
-
+    if(_autoSqueeze)
+      squeezeMemory();
     DEBTRACE( "-----------------End PyNode::outputs-----------------" );
   }
   DEBTRACE( "++++++++++++++ End PyNode::execute: " << getName() << " ++++++++++++++++++++" );
 }
 
+void PythonNode::squeezeMemorySafe()
+{
+  AutoGIL agil;
+  if(_mode==PythonNode::REMOTE_NAME)
+    this->squeezeMemoryRemote();
+  else
+    this->squeezeMemory();
+}
+  
+void PythonNode::squeezeMemory()
+{
+  for(auto p : _setOfInputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      PyDict_DelItemString(_context,p->getName().c_str());
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
+void PythonNode::squeezeMemoryRemote()
+{
+  for(auto p : _setOfInputPort)
+    {
+      InputPyPort *p2(static_cast<InputPyPort *>(p));
+      if(p2->canSafelySqueezeMemory())
+        p2->put(Py_None);
+    }
+  for(auto p : _setOfOutputPort)
+    {
+      OutputPyPort *p2(static_cast<OutputPyPort *>(p));
+      p2->putWithoutForward(Py_None);
+    }
+}
+
 std::string PythonNode::getContainerLog()
 {
   return PythonEntry::GetContainerLog(_mode,_container,this);
index fdac25950efd5bcdbc0889c97a714e5f22e7a726..8e05906dfdb5ad09ba581b92908d3d4b4aa162ce 100644 (file)
@@ -85,6 +85,12 @@ namespace YACS
       PythonNode* cloneNode(const std::string& name);
       virtual std::string typeName() { return "YACS__ENGINE__PythonNode"; }
       void applyDPLScope(ComposedNode *gfn);
+      void setSqueezeStatus(bool sqStatus) { _autoSqueeze=sqStatus; }
+      bool getSqueezeStatus() const { return _autoSqueeze; }
+      void squeezeMemorySafe();
+    protected:
+      void squeezeMemory();
+      void squeezeMemoryRemote();
     public:
       static const char KIND[];
       static const char IMPL_NAME[];
@@ -92,6 +98,7 @@ namespace YACS
       static const char REMOTE_NAME[];
       static const char DPL_INFO_NAME[];
     protected:
+      bool _autoSqueeze = false;
       Engines::PyScriptNode_var _pynode;
     };
 
index 2da1567aee06078f98a15ef0c1e0669aef238cc1..dfe676e1f8057c715d642d914343cf0420f87bc3 100644 (file)
@@ -34,6 +34,8 @@ using namespace std;
 
 void releasePyObj(PyObject* data)
 {
+  if(!data)
+    return ;
   DEBTRACE( "data refcnt: " << data->ob_refcnt );
   if (PyObject_HasAttrString(data, (char*)"_is_a"))
     {
@@ -159,11 +161,23 @@ void InputPyPort::put(const void *data) throw(ConversionException)
   put((PyObject *)data);
 }
 
-void InputPyPort::put(PyObject *data) throw(ConversionException)
+void InputPyPort::releaseDataUnsafe()
 {
-  InterpreterUnlocker l;
   releasePyObj(_data);
   Py_XDECREF(_data); 
+  _data = nullptr;
+}
+
+void InputPyPort::releaseData()
+{
+  InterpreterUnlocker l;
+  InputPyPort::releaseDataUnsafe();
+}
+
+void InputPyPort::put(PyObject *data) throw(ConversionException)
+{
+  InterpreterUnlocker l;
+  InputPyPort::releaseDataUnsafe();
   _data = data;
   _stringRef="";
   Py_INCREF(_data); 
@@ -315,9 +329,8 @@ void OutputPyPort::put(const void *data) throw(ConversionException)
   put((PyObject *)data);
 }
 
-void OutputPyPort::put(PyObject *data) throw(ConversionException)
+void OutputPyPort::putWithoutForward(PyObject *data) throw(ConversionException)
 {
-  InputPort *p;
   DEBTRACE( "OutputPyPort::put.ob refcnt: " << data->ob_refcnt );
 #ifdef _DEVDEBUG_
   PyObject_Print(data,stderr,Py_PRINT_RAW);
@@ -328,6 +341,11 @@ void OutputPyPort::put(PyObject *data) throw(ConversionException)
   _data = data;
   Py_INCREF(_data); 
   //no registerPyObj : we steal the output reference of the node
+}
+
+void OutputPyPort::put(PyObject *data) throw(ConversionException)
+{
+  putWithoutForward(data);
   DEBTRACE( "OutputPyPort::put.ob refcnt: " << data->ob_refcnt );
   OutputPort::put(data);
 }
index b391e00283d1ae90f574205b9285a91def88306d..9945a1c434fcebe1d7e668266d1a0d63128806fb 100644 (file)
@@ -79,6 +79,7 @@ namespace YACS
       bool edIsManuallyInitialized() const;
       void edRemoveManInit();
       virtual void put(const void *data) throw(ConversionException);
+      void releaseData() override;
       void put(PyObject *data) throw(ConversionException);
       InputPort *clone(Node *newHelder) const;
       //special typedef PyObj used in SWIG to increment ref count on output
@@ -93,7 +94,8 @@ namespace YACS
       virtual std::string typeName() {return "YACS__ENGINE__InputPyPort";}
       virtual std::string valToStr();
       virtual void valFromStr(std::string valstr);
-
+    protected:
+      void releaseDataUnsafe();
     protected:
       PyObject* _data;
       PyObject* _initData;
@@ -107,6 +109,7 @@ namespace YACS
       OutputPyPort(const OutputPyPort& other, Node *newHelder);
       ~OutputPyPort();
       virtual void put(const void *data) throw(ConversionException);
+      void putWithoutForward(PyObject *data) throw(ConversionException);
       void put(PyObject *data) throw(ConversionException);
       OutputPort *clone(Node *newHelder) const;
       virtual PyObject * get() const;
index 8e8bba3b315bac6b7763955bc5553336445d99ed..606f07d61c7375849403958305e38ed0e9273dc0 100644 (file)
@@ -70,6 +70,10 @@ void InputXmlPort::put(const char *data) throw (ConversionException)
   _data = data;
 }
 
+void InputXmlPort::releaseData()
+{//nothing because _data has no ref counter
+}
+
 bool InputXmlPort::isEmpty()
 {
   return _data.empty();
index 66ed9c87779db25a7a6ddcbb7ab3ae6cb1da92c9..61836d3c91280cb264d1b3d148092e3decc998a7 100644 (file)
@@ -43,6 +43,7 @@ namespace YACS
       void edRemoveManInit();
       virtual void put(const void *data) throw (ConversionException);
       void put(const char *data) throw (ConversionException);
+      void releaseData() override;
       InputPort *clone(Node *newHelder) const;
       virtual const char * getXml() const;
       void *get() const throw(Exception);