From 39d936d1fb1700f73c84046b8230b7a5b774859b Mon Sep 17 00:00:00 2001 From: caremoli Date: Mon, 28 Sep 2009 13:54:34 +0000 Subject: [PATCH] CCAR: add a timeout for DSC/CALCIUM reads If a read (cplxxx) is waiting more than the value given by the environment variable DSC_TIMEOUT, the read is interrupted and an exception is raised. If DSC_TIMEOUT is not set or its value is 0, the timeout is infinite. It is possible to set the timeout by using the properties of the component. The value of a property is set in an environment variable with the same name by a call to beginService --- src/DSC/DSC_Python/calcium.i | 7 +- src/DSC/DSC_Python/dsccalcium.py | 4 + .../Calcium/CalciumCxxInterface.hxx | 58 +++-- src/DSC/DSC_User/Datastream/GenericPort.hxx | 209 ++++++++++-------- src/DSC/DSC_User/Superv_Component_i.cxx | 23 ++ src/DSC/DSC_User/Superv_Component_i.hxx | 11 +- 6 files changed, 202 insertions(+), 110 deletions(-) diff --git a/src/DSC/DSC_Python/calcium.i b/src/DSC/DSC_Python/calcium.i index 768cb438c..bc504c30f 100644 --- a/src/DSC/DSC_Python/calcium.i +++ b/src/DSC/DSC_Python/calcium.i @@ -419,7 +419,6 @@ struct stringArray } ~stringArray() { - std::cerr << "~stringArray() " << nelem << std::endl; for(int i=0;icxxObjRefToPyObjRef($1, 1); - //All output Ports::Port_ptr variables are duplicated by security. Need to release them for python . Explanation ?? + //All output Ports::Port_ptr variables are duplicated by security. Need to release them for python. CORBA::release($1); } %typemap(out) Ports::PortProperties_ptr, Engines::Salome_file_ptr { $result = api->cxxObjRefToPyObjRef($1, 1); + //the _ptr is duplicated by the routine called. + //Need to release it for Python because the call to cxxObjRefToPyObjRef has created another ref with a count of 1 + CORBA::release($1); } %typemap(out) Engines::DSC::uses_port * @@ -801,6 +803,7 @@ class PySupervCompo:public Superv_Component_i CORBA::Boolean is_connected(const char* port_name) throw (Engines::DSC::PortNotDefined); // End of DSC interface for python components + static void setTimeOut(); %extend diff --git a/src/DSC/DSC_Python/dsccalcium.py b/src/DSC/DSC_Python/dsccalcium.py index 6aa2d6c4b..e4d4b7f1c 100644 --- a/src/DSC/DSC_Python/dsccalcium.py +++ b/src/DSC/DSC_Python/dsccalcium.py @@ -96,3 +96,7 @@ class PyDSCComponent(SALOME_ComponentPy.SALOME_ComponentPy_i, def checkOutputFilesToService(self,service_name): return self.proxy.checkOutputFilesToService(service_name) + def beginService(self,service_name): + SALOME_ComponentPy.SALOME_ComponentPy_i.beginService(self,service_name) + self.proxy.setTimeOut() + diff --git a/src/DSC/DSC_User/Datastream/Calcium/CalciumCxxInterface.hxx b/src/DSC/DSC_User/Datastream/Calcium/CalciumCxxInterface.hxx index 4b766d3b7..18738f46a 100644 --- a/src/DSC/DSC_User/Datastream/Calcium/CalciumCxxInterface.hxx +++ b/src/DSC/DSC_User/Datastream/Calcium/CalciumCxxInterface.hxx @@ -259,36 +259,60 @@ namespace CalciumInterface { std::stringstream msgout,msg; if ( _dependencyType == CalciumTypes::TIME_DEPENDENCY ) { - double tt=ti; - msg << "ti=" << ti << ", tf=" << tf ; - writeEvent("BEGIN_READ",containerName,componentName,nomVar.c_str(),0,msg.str().c_str()); - corbaData = port->get(tt,tf, 0); - msgout << "read t=" << tt ; + try + { + double tt=ti; + msg << "ti=" << ti << ", tf=" << tf ; + writeEvent("BEGIN_READ",containerName,componentName,nomVar.c_str(),0,msg.str().c_str()); + corbaData = port->get(tt,tf, 0); + msgout << "read t=" << tt ; #ifdef MYDEBUG - std::cout << "-------- CalciumInterface(ecp_lecture) MARK 5 ------------------" << std::endl; + std::cout << "-------- CalciumInterface(ecp_lecture) MARK 5 ------------------" << std::endl; #endif + } + catch ( const DSC_Exception & ex) + { + writeEvent("END_READ",containerName,componentName,nomVar.c_str(),0,ex.what()); + throw; + } } else if ( _dependencyType == CalciumTypes::ITERATION_DEPENDENCY ) { - msg << "i=" << i ; - writeEvent("BEGIN_READ",containerName,componentName,nomVar.c_str(),0,msg.str().c_str()); - corbaData = port->get(0, i); - msgout << "read i=" << i ; + try + { + msg << "i=" << i ; + writeEvent("BEGIN_READ",containerName,componentName,nomVar.c_str(),0,msg.str().c_str()); + corbaData = port->get(0, i); + msgout << "read i=" << i ; #ifdef MYDEBUG - std::cout << "-------- CalciumInterface(ecp_lecture) MARK 6 ------------------" << std::endl; + std::cout << "-------- CalciumInterface(ecp_lecture) MARK 6 ------------------" << std::endl; #endif + } + catch ( const DSC_Exception & ex) + { + writeEvent("END_READ",containerName,componentName,nomVar.c_str(),0,ex.what()); + throw; + } } else { // Sequential read + try + { #ifdef MYDEBUG - std::cout << "-------- CalciumInterface(ecp_lecture) MARK 7 ------------------" << std::endl; + std::cout << "-------- CalciumInterface(ecp_lecture) MARK 7 ------------------" << std::endl; #endif - writeEvent("BEGIN_READ",containerName,componentName,nomVar.c_str(),0,"Sequential read"); - corbaData = port->next(ti,i); - msgout << "read "; - if(i==0)msgout<< "t=" <next(ti,i); + msgout << "read "; + if(i==0)msgout<< "t=" < @@ -116,7 +118,7 @@ GenericPort::~GenericPort() { template < typename DataManipulator, typename COUPLING_POLICY> void GenericPort::close (PortableServer::POA_var poa, - PortableServer::ObjectId_var id) { + PortableServer::ObjectId_var id) { // Ferme le port en supprimant le servant // La desactivation du servant du POA provoque sa suppression poa->deactivate_object (id); @@ -148,8 +150,8 @@ GenericPort::wakeupWaiting() template < typename DataManipulator, typename COUPLING_POLICY> template < typename TimeType,typename TagType> void GenericPort::put(CorbaInDataType dataParam, - TimeType time, - TagType tag) { + TimeType time, + TagType tag) { fflush(stdout); fflush(stderr); try { @@ -229,25 +231,25 @@ void GenericPort::put(CorbaInDataType dataPara #endif // On n'a pas trouvé de dataId supérieur au notre ou - // on a trouvé une clé > à cet Id + // on a trouvé une clé > à cet Id if (wDataIt == storedDatas.end() || storedDatas.key_comp()(currentDataId,(*wDataIt).first) ) { #ifdef MYDEBUG - std::cout << "-------- Put : MARK 6 ------------------" << std::endl; + std::cout << "-------- Put : MARK 6 ------------------" << std::endl; #endif - // Ajoute la donnee dans la table - wDataIt = storedDatas.insert(wDataIt, make_pair (currentDataId, data)); + // Ajoute la donnee dans la table + wDataIt = storedDatas.insert(wDataIt, make_pair (currentDataId, data)); } else { - // Si on n'est pas en fin de liste et qu'il n'y a pas de relation d'ordre strict - // entre notre dataId et le DataId pointé c'est qu'ils sont identiques + // Si on n'est pas en fin de liste et qu'il n'y a pas de relation d'ordre strict + // entre notre dataId et le DataId pointé c'est qu'ils sont identiques #ifdef MYDEBUG - std::cout << "-------- Put : MARK 7 ------------------" << std::endl; + std::cout << "-------- Put : MARK 7 ------------------" << std::endl; #endif - // Les données sont remplacées par les nouvelles valeurs - // lorsque que le dataId existe déjà - DataType old_data = (*wDataIt).second; - (*wDataIt).second = data; - // Detruit la vieille donnee - DataManipulator::delete_data (old_data); + // Les données sont remplacées par les nouvelles valeurs + // lorsque que le dataId existe déjà + DataType old_data = (*wDataIt).second; + (*wDataIt).second = data; + // Detruit la vieille donnee + DataManipulator::delete_data (old_data); } #ifdef MYDEBUG @@ -278,14 +280,14 @@ void GenericPort::put(CorbaInDataType dataPara // sur les dataIds (à cause du lock utilisé dans la méthode put et les méthodes get ) // rem : Utilisation de l'évaluation gauche droite du logical C or if ( waitingForAnyDataId || - ( waitingForConvenientDataId && - isDataIdConveniant(storedDatas, expectedDataId, dummy1, dummy2, dummy3) ) - ) { + ( waitingForConvenientDataId && + isDataIdConveniant(storedDatas, expectedDataId, dummy1, dummy2, dummy3) ) + ) { #ifdef MYDEBUG - std::cout << "-------- Put : MARK 10 ------------------" << std::endl; + std::cout << "-------- Put : MARK 10 ------------------" << std::endl; #endif - //Doit pouvoir réveiller le get ici (a vérifier) - expectedDataReceived = true; + //Doit pouvoir réveiller le get ici (a vérifier) + expectedDataReceived = true; } } @@ -296,9 +298,9 @@ void GenericPort::put(CorbaInDataType dataPara // si waitingForAnyDataId était positionné, c'est forcément lui qui a activer // expectedDataReceived à true if (waitingForAnyDataId) - waitingForAnyDataId = false; + waitingForAnyDataId = false; else - waitingForConvenientDataId = false; + waitingForConvenientDataId = false; // Reveille le thread du destinataire (stoppe son attente) // Ne faudrait-il pas réveiller plutôt tous les threads ? // Celui réveillé ne correspond pas forcément à celui qui demande @@ -348,7 +350,7 @@ template < typename DataManipulator, typename COUPLING_POLICY > template < typename TimeType,typename TagType> typename DataManipulator::Type GenericPort::get(TimeType time, - TagType tag) + TagType tag) // REM : Laisse passer toutes les exceptions // En particulier les SALOME_Exceptions qui viennent de la COUPLING_POLICY // Pour déclarer le throw avec l'exception spécifique il faut que je vérifie @@ -390,33 +392,33 @@ GenericPort::get(TimeType time, if ( isEqual ) { #ifdef MYDEBUG - std::cout << "-------- Get : MARK 4 ------------------" << std::endl; + std::cout << "-------- Get : MARK 4 ------------------" << std::endl; #endif - // La propriété de la données N'EST PAS transmise à l'utilisateur en mode CALCIUM. - // Si l'utilisateur supprime la donnée, storedDataIds devient incohérent - // C'est EraseDataId qui choisi ou non de supprimer la donnée - // Du coup interaction potentielle entre le 0 copy et gestion de l'historique - dataToTransmit = (*wDataIt1).second; + // La propriété de la données N'EST PAS transmise à l'utilisateur en mode CALCIUM. + // Si l'utilisateur supprime la donnée, storedDataIds devient incohérent + // C'est EraseDataId qui choisi ou non de supprimer la donnée + // Du coup interaction potentielle entre le 0 copy et gestion de l'historique + dataToTransmit = (*wDataIt1).second; #ifdef MYDEBUG - std::cout << "-------- Get : MARK 5 ------------------" << std::endl; - std::cout << "-------- Get : Données trouvées à t : " << std::endl; - typename DataManipulator::InnerType const * const InIt1 = DataManipulator::getPointer(dataToTransmit); - size_t N = DataManipulator::size(dataToTransmit); - std::copy(InIt1, InIt1 + N, - std::ostream_iterator< InnerType > (std::cout," ")); - std::cout << std::endl; + std::cout << "-------- Get : MARK 5 ------------------" << std::endl; + std::cout << "-------- Get : Données trouvées à t : " << std::endl; + typename DataManipulator::InnerType const * const InIt1 = DataManipulator::getPointer(dataToTransmit); + size_t N = DataManipulator::size(dataToTransmit); + std::copy(InIt1, InIt1 + N, + std::ostream_iterator< InnerType > (std::cout," ")); + std::cout << std::endl; #endif - // Décide de la suppression de certaines instances de données - // La donnée contenu dans la structure CORBA et son dataId sont désallouées - // Méthode provenant de la COUPLING_POLICY - typename COUPLING_POLICY::template EraseDataIdProcessor processEraseDataId(*this); - processEraseDataId.apply(storedDatas,wDataIt1); + // Décide de la suppression de certaines instances de données + // La donnée contenu dans la structure CORBA et son dataId sont désallouées + // Méthode provenant de la COUPLING_POLICY + typename COUPLING_POLICY::template EraseDataIdProcessor processEraseDataId(*this); + processEraseDataId.apply(storedDatas,wDataIt1); #ifdef MYDEBUG - std::cout << "-------- Get : MARK 6 ------------------" << std::endl; + std::cout << "-------- Get : MARK 6 ------------------" << std::endl; #endif - break; + break; } #ifdef MYDEBUG @@ -427,45 +429,45 @@ GenericPort::get(TimeType time, // Le DataId demandé n'est pas trouvé mais est encadré ET la politique de couplage // implémente une méthode processBoundedDataId capable de générer les données à retourner if ( isBounded ) { - // Pour être cohérent avec la politique du bloc précédent - // on stocke la paire (dataId,données interpolées ). - // CALCIUM ne stockait pas les données interpolées. - // Cependant comme les données sont censées être produites - // par ordre croissant de DataId, de nouvelles données ne devrait pas améliorer - // l'interpolation. -#ifdef MYDEBUG - std::cout << "-------- Get : MARK 8 ------------------" << std::endl; -#endif - - typedef typename COUPLING_POLICY::template BoundedDataIdProcessor BDI; - BDI processBoundedDataId(*this); - // typename COUPLING_POLICY::template BoundedDataIdProcessor processBoundedDataId(*this); - //si static BDIP::apply(dataToTransmit,expectedDataId,wDataIt1); - //ancienne version template processBoundedDataId(dataToTransmit,expectedDataId,wDataIt1); - //BDIP processBoundedDataId; - processBoundedDataId.apply(dataToTransmit,expectedDataId,wDataIt1); + // Pour être cohérent avec la politique du bloc précédent + // on stocke la paire (dataId,données interpolées ). + // CALCIUM ne stockait pas les données interpolées. + // Cependant comme les données sont censées être produites + // par ordre croissant de DataId, de nouvelles données ne devrait pas améliorer + // l'interpolation. +#ifdef MYDEBUG + std::cout << "-------- Get : MARK 8 ------------------" << std::endl; +#endif + + typedef typename COUPLING_POLICY::template BoundedDataIdProcessor BDI; + BDI processBoundedDataId(*this); + // typename COUPLING_POLICY::template BoundedDataIdProcessor processBoundedDataId(*this); + //si static BDIP::apply(dataToTransmit,expectedDataId,wDataIt1); + //ancienne version template processBoundedDataId(dataToTransmit,expectedDataId,wDataIt1); + //BDIP processBoundedDataId; + processBoundedDataId.apply(dataToTransmit,expectedDataId,wDataIt1); - // Il ne peut pas y avoir déjà une clé expectedDataId dans storedDatas (utilisation de la notation [] ) - // La nouvelle donnée produite est stockée, ce n'était pas le cas dans CALCIUM - // Cette opération n'a peut être pas un caractère générique. - // A déplacer en paramètre de la méthode précédente ? ou déléguer ce choix au mode de couplage ? - storedDatas[expectedDataId]=dataToTransmit; + // Il ne peut pas y avoir déjà une clé expectedDataId dans storedDatas (utilisation de la notation [] ) + // La nouvelle donnée produite est stockée, ce n'était pas le cas dans CALCIUM + // Cette opération n'a peut être pas un caractère générique. + // A déplacer en paramètre de la méthode précédente ? ou déléguer ce choix au mode de couplage ? + storedDatas[expectedDataId]=dataToTransmit; #ifdef MYDEBUG - std::cout << "-------- Get : Données calculées à t : " << std::endl; - typename DataManipulator::InnerType const * const InIt1 = DataManipulator::getPointer(dataToTransmit); - size_t N = DataManipulator::size(dataToTransmit); + std::cout << "-------- Get : Données calculées à t : " << std::endl; + typename DataManipulator::InnerType const * const InIt1 = DataManipulator::getPointer(dataToTransmit); + size_t N = DataManipulator::size(dataToTransmit); - std::copy(InIt1, InIt1 + N, - std::ostream_iterator< InnerType > (std::cout," ")); - std::cout << std::endl; - std::cout << "-------- Get : MARK 9 ------------------" << std::endl; + std::copy(InIt1, InIt1 + N, + std::ostream_iterator< InnerType > (std::cout," ")); + std::cout << std::endl; + std::cout << "-------- Get : MARK 9 ------------------" << std::endl; #endif - typename COUPLING_POLICY::template EraseDataIdProcessor processEraseDataId(*this); - processEraseDataId.apply(storedDatas,wDataIt1); + typename COUPLING_POLICY::template EraseDataIdProcessor processEraseDataId(*this); + processEraseDataId.apply(storedDatas,wDataIt1); - break; + break; } // Délègue au mode de couplage la gestion d'une demande de donnée non disponible @@ -487,7 +489,23 @@ GenericPort::get(TimeType time, std::cout << "-------- Get : waiting datas ------------------" << std::endl; #endif fflush(stdout);fflush(stderr); - cond_instance.wait(); + unsigned long ts, tns,rs=Superv_Component_i::dscTimeOut; + if(rs==0) + cond_instance.wait(); + else + { + //Timed wait on omni condition + omni_thread::get_time(&ts,&tns, rs,0); + int success=cond_instance.timedwait(ts,tns); + if(!success) + { + // Waiting too long probably blocking + std::stringstream msg; + msg<<"Timeout ("< template < typename TimeType,typename TagType> typename DataManipulator::Type GenericPort::get(TimeType& ti, - TimeType tf, - TagType tag ) { + TimeType tf, + TagType tag ) { ti = COUPLING_POLICY::getEffectiveTime(ti,tf); return get(ti,tag); } @@ -531,7 +549,7 @@ template < typename DataManipulator, typename COUPLING_POLICY > template < typename TimeType,typename TagType> typename DataManipulator::Type GenericPort::next(TimeType &t, - TagType &tag ) { + TagType &tag ) { typedef typename COUPLING_POLICY::DataId DataId; @@ -571,7 +589,7 @@ GenericPort::next(TimeType &t, // Délègue au mode de couplage la gestion d'une demande de donnée non disponible // si le port est deconnecté if ( processDisconnect.apply(storedDatas, lastDataId, wDataIt1) ) { - waitingForAnyDataId = false; break; + waitingForAnyDataId = false; break; } #ifdef MYDEBUG @@ -585,19 +603,34 @@ GenericPort::next(TimeType &t, std::cout << "-------- Next : waiting datas ------------------" << std::endl; #endif fflush(stdout);fflush(stderr); - cond_instance.wait(); + unsigned long ts, tns,rs=Superv_Component_i::dscTimeOut; + if(rs==0) + cond_instance.wait(); + else + { + //Timed wait on omni condition + omni_thread::get_time(&ts,&tns, rs,0); + int success=cond_instance.timedwait(ts,tns); + if(!success) + { + // Waiting too long probably blocking + std::stringstream msg; + msg<<"Timeout ("< Superv_Component_i::_factory_map; +long Superv_Component_i::dscTimeOut=0; + Superv_Component_i::Superv_Component_i(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, @@ -48,6 +50,7 @@ Superv_Component_i::Superv_Component_i(CORBA::ORB_ptr orb, #ifdef MYDEBUG std::cerr << "--Superv_Component_i : MARK 1 ---- " << instanceName << "----" << std::endl; #endif + setTimeOut(); } Superv_Component_i::Superv_Component_i(CORBA::ORB_ptr orb, PortableServer::POA_ptr poa, @@ -61,6 +64,7 @@ Superv_Component_i::Superv_Component_i(CORBA::ORB_ptr orb, #ifdef MYDEBUG std::cerr << "--Superv_Component_i : MARK 1 ---- " << instanceName << "----" << std::endl; #endif + setTimeOut(); } @@ -318,3 +322,22 @@ Superv_Component_i::get_uses_port_names(std::vector & port_names, for (it=my_superv_ports.begin(); it!=my_superv_ports.end();++it) if( (*it).second->p_ref == NULL ) port_names.push_back((*it).first); } + +void Superv_Component_i::setTimeOut() +{ + char* valenv=getenv("DSC_TIMEOUT"); + if(valenv) + { + std::istringstream iss(valenv); + long temp; + if (iss >> temp) + if(temp >=0) + Superv_Component_i::dscTimeOut=temp; + } +} + +void Superv_Component_i::beginService(const char *serviceName) +{ + Engines_DSC_i::beginService(serviceName); + setTimeOut(); +} diff --git a/src/DSC/DSC_User/Superv_Component_i.hxx b/src/DSC/DSC_User/Superv_Component_i.hxx index 6c7b1590e..246118160 100644 --- a/src/DSC/DSC_User/Superv_Component_i.hxx +++ b/src/DSC/DSC_User/Superv_Component_i.hxx @@ -44,7 +44,7 @@ * a programming level for service's developpers who want to use DSC ports. * * This class has two level for using and declare ports. The higher level proposes - * operations to add ports that are provided by default by Salomé like Calcium ports. + * operations to add ports that are provided by default by SALOME like Calcium ports. * It provides too some methods to add their own DSC_User ports. * * \note This class doesn't implement the init_service CORBA operation. @@ -256,6 +256,12 @@ public: */ virtual port_factory * get_factory(const std::string & factory_name); + /*! + */ + static long dscTimeOut; + static void setTimeOut(); + void beginService(const char *serviceName); + private: // Factory map typedef std::map factory_map_t; @@ -384,8 +390,7 @@ Superv_Component_i::get_port( const char * port_name) retPort = dynamic_cast(port); if ( retPort == NULL ) { - delete port; - throw BadCast( LOC("Can't cast to asked port type " )); + throw BadCast( LOC("Can't cast to required port type " )); } return retPort; -- 2.39.2