From 96b7b1094863ec5e51a82eb9486ebda94621031a Mon Sep 17 00:00:00 2001 From: fayolle Date: Tue, 24 Apr 2007 16:03:05 +0000 Subject: [PATCH] Correction du BUG (non retour des lectures sequentiels) sur appel cp_fin --- .../Calcium/CalciumCouplingPolicy.cxx | 9 ++-- .../Calcium/CalciumCouplingPolicy.hxx | 44 +++++++++------ .../Calcium/CalciumGenericUsesPort.hxx | 2 +- .../Datastream/Calcium/CalciumInterface.hxx | 21 +++++--- .../Datastream/Calcium/CalciumTypes.hxx | 2 +- .../DSC_User/Datastream/Calcium/Makefile.am | 6 ++- .../Datastream/Calcium/testInterpolation.cxx | 54 +++++++++++++++---- .../DSC_User/Datastream/CouplingPolicy.hxx | 12 +++-- src/DSC/DSC_User/Datastream/GenericPort.hxx | 33 ++++++++---- 9 files changed, 130 insertions(+), 53 deletions(-) diff --git a/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.cxx b/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.cxx index 1e3ae5d4a..c9628f4dd 100644 --- a/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.cxx +++ b/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.cxx @@ -98,7 +98,7 @@ CalciumCouplingPolicy::getEffectiveTime(CalciumCouplingPolicy::TimeType ti, } void CalciumCouplingPolicy::disconnect(bool provideLastGivenValue) { - // TODO Réveiller les ports en attente ! OU timeout ? + if (provideLastGivenValue) { std::cout << "-------- CalciumCouplingPolicy::disconnect CP_CONT ------------------" << std::endl; _disconnectDirective = CalciumTypes::CONTINUE; @@ -106,10 +106,7 @@ void CalciumCouplingPolicy::disconnect(bool provideLastGivenValue) { std::cout << "-------- CalciumCouplingPolicy::disconnect CP_ARRET ------------------" << std::endl; _disconnectDirective = CalciumTypes::STOP; } - //Wakeup get data if any - //wakeupWaiting(); -// if (waitingForAnyDataId || waitingForConvenientDataId); -// cond_instance.signal(); - + //Wakeup get data if any + wakeupWaiting(); } diff --git a/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.hxx b/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.hxx index e08fcd0ae..9de1849c8 100644 --- a/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.hxx +++ b/src/DSC/DSC_User/Datastream/Calcium/CalciumCouplingPolicy.hxx @@ -55,6 +55,7 @@ public: typedef CalciumTypes::DisconnectDirective DisconnectDirective; private: + DependencyType _dependencyType; size_t _storageLevel; DateCalSchem _dateCalSchem; @@ -90,8 +91,6 @@ public: // Classe DataId rassemblant les paramètres de la méthode PORT::put // qui identifient l'instance d'une donnée pour Calcium // Rem : Le DataId doit pouvoir être une key dans une map stl - // typedef CORBA::Double TimeType; - // typedef CORBA::Long TagType; typedef double TimeType; typedef long TagType; typedef std::pair< TimeType , TagType > DataId; @@ -124,7 +123,6 @@ public: TimeType getEffectiveTime(TimeType ti, TimeType tf); void disconnect(bool provideLastGivenValue); - virtual void wakeupWaiting(){}; }; //Fin de CalciumCouplingPolicy @@ -164,15 +162,15 @@ struct CalciumCouplingPolicy::InternalDataIdContainer : public std::vector< std: template struct CalciumCouplingPolicy::BoundedDataIdProcessor{ - CalciumCouplingPolicy _couplingPolicy; + const CalciumCouplingPolicy & _couplingPolicy; - BoundedDataIdProcessor(CalciumCouplingPolicy couplingPolicy): + BoundedDataIdProcessor(const CalciumCouplingPolicy &couplingPolicy): _couplingPolicy(couplingPolicy) {}; // Méthode implémentant l'interpolation temporelle template < typename MapIterator > void inline apply (typename iterator_t::value_type & data, - const DataId & dataId, const MapIterator & it1) { + const DataId & dataId, const MapIterator & it1) const { typedef typename iterator_t::value_type value_type; typedef typename DataManipulator::InnerType InnerType; @@ -221,7 +219,11 @@ struct CalciumCouplingPolicy::BoundedDataIdProcessor{ // REM : Pour des buffers de type int // le compilo indiquera warning: converting to `long int' from `Double' std::transform(InIt1,InIt1+dataSize,InIt2,OutIt, - ( _1 - _2 ) * coeff + _2 ); + ( _1 - _2 ) * coeff + _2 ); +// for(size_t i =0; i < dataSize3; ++i) { +// OutIt[i]=(InIt1[i] - InIt2[i]) * coeff + InIt2[i]; +// } + } std::cout << "-------- CalciumCouplingPolicy::BoundedDataIdProcessor : Données calculées à t : " << std::endl; std::copy(OutIt,OutIt+dataSize,std::ostream_iterator(std::cout," ")); @@ -303,9 +305,9 @@ bool CalciumCouplingPolicy::isDataIdConveniant( AssocContainer & storedDatas, co template < typename DataManipulator > struct CalciumCouplingPolicy::EraseDataIdProcessor { - CalciumCouplingPolicy _couplingPolicy; + CalciumCouplingPolicy &_couplingPolicy; - EraseDataIdProcessor(CalciumCouplingPolicy couplingPolicy): + EraseDataIdProcessor(CalciumCouplingPolicy &couplingPolicy): _couplingPolicy(couplingPolicy) {}; template < typename Container > @@ -350,9 +352,9 @@ struct CalciumCouplingPolicy::EraseDataIdProcessor { template < typename DataManipulator > struct CalciumCouplingPolicy::DisconnectProcessor { - CalciumCouplingPolicy _couplingPolicy; + const CalciumCouplingPolicy & _couplingPolicy; - DisconnectProcessor(CalciumCouplingPolicy couplingPolicy): + DisconnectProcessor(const CalciumCouplingPolicy & couplingPolicy): _couplingPolicy(couplingPolicy) {}; template < typename Container, typename DataId > @@ -365,14 +367,17 @@ struct CalciumCouplingPolicy::DisconnectProcessor { typedef typename Container::iterator iterator; // Pas de traitement particulier a effectuer - std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK1 --------" << std::endl; - if ( _couplingPolicy._disconnectDirective == CalciumTypes::UNDEFINED_DIRECTIVE ) return false; - std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor, CP_CONT : " << (*wDataIt1).first << std::endl; + std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK1 ("<< _couplingPolicy._disconnectDirective<<") --------" << std::endl; + if ( (_couplingPolicy._disconnectDirective) == (CalciumTypes::UNDEFINED_DIRECTIVE) ) return false; + std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK2 --------" << std::endl; + + if ( _couplingPolicy._disconnectDirective == CalciumTypes::CP_ARRET ) throw(DATASTREAM_EXCEPTION(LOC(OSS()<< "La directive CP_ARRET" << " provoque l'interruption de toute lecture de données"))); std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK3 --------" << std::endl; + // S'il n'y a plus de données indique que l'on a pas pu effectuer de traitement if ( storedDatas.empty() ) throw(DATASTREAM_EXCEPTION(LOC(OSS()<< "La directive CP_CONT" @@ -382,13 +387,20 @@ struct CalciumCouplingPolicy::DisconnectProcessor { // qu'en mode itératif il ne soit pas plus grand que le plus grand DataId stocké auquel // cas on doit renvoyer une expection car on n'est plus connecté et on ne pourra jamais // fournir de données pour ce dataId. - std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK4 --------" << std::endl; + std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK4 " << expectedDataId <<" --------" << std::endl; + // >= expectedDataId iterator it1 = storedDatas.lower_bound(expectedDataId); - if (it1 != storedDatas.end()) + std::cout << "-------- CalciumCouplingPolicy::DisconnectProcessor MARK5 " << std::endl; + for (iterator it=storedDatas.begin();it!=storedDatas.end();++it) + std::cout <<" "<<(*it).first ; + std::cout <::disconn port->disconnect(provideLastGivenValue); } catch(const CORBA::SystemException& ex){ throw DSC_Exception(LOC(OSS() << "Impossible d'invoquer la méthode disconnect sur le port provide n°" - << i << "( i>= 0)")); + << i << " ( i>= 0)")); } } diff --git a/src/DSC/DSC_User/Datastream/Calcium/CalciumInterface.hxx b/src/DSC/DSC_User/Datastream/Calcium/CalciumInterface.hxx index c7cf475ff..ad33a0d3a 100644 --- a/src/DSC/DSC_User/Datastream/Calcium/CalciumInterface.hxx +++ b/src/DSC/DSC_User/Datastream/Calcium/CalciumInterface.hxx @@ -69,7 +69,7 @@ public : std::vector::const_iterator it; component.get_uses_port_names(usesPortNames); - //récupérer le type de réel su port est un peu difficile + //récupérer le type de réel du port est un peu difficile //car l'interface nous donne aucune indication uses_port *myUsesPort; @@ -87,14 +87,23 @@ public : myCalciumUsesPort->disconnect(provideLastGivenValue); } catch ( const Superv_Component_i::PortNotDefined & ex) { std::cerr << ex.what() << std::endl; - throw (DatastreamException(CalciumTypes::CPNMVR,ex)); + //throw (DatastreamException(CalciumTypes::CPNMVR,ex)); + // On continue à traiter la deconnexion des autres ports uses } catch ( const Superv_Component_i::PortNotConnected & ex) { - std::cerr << ex.what() << std::endl;; - throw (DatastreamException(CalciumTypes::CPLIEN,ex)); - } catch ( const Superv_Component_i::BadCast & ex) { + std::cerr << ex.what() << std::endl; + // throw (DatastreamException(CalciumTypes::CPLIEN,ex)); + // On continue à traiter la deconnexion des autres ports uses + } catch ( const Superv_Component_i::BadCast & ex) { std::cerr << ex.what() << std::endl; throw (DatastreamException(CalciumTypes::CPTPVR,ex)); - } // Laisse passer les autres exceptions. + } catch ( const DSC_Exception & ex) { + // exception venant du port uses + std::cerr << ex.what() << std::endl; + // On continue à traiter la deconnexion des autres ports uses + } catch (...) {// On laisse passer les autres exceptions + std::cout << "ecp_fin : Exception innatendue " < + #include #include #include +#include +#include +#include struct MyRand { static const double MAXVALUE = 150.0; @@ -41,25 +45,30 @@ struct MyRand { int main() { - typedef double Type; + typedef long Type; typedef double TimeType; const int dataSize1=20; const int dataSize2=30; const int dataSize3=std::min< size_t >(dataSize1,dataSize2); - std::vector vect1(dataSize1),vect2(dataSize2),vect3(dataSize3); + std::vector vect1(dataSize1),vect2(dataSize2),vect3(dataSize3),vect4(dataSize3); MyRand myRand; //TEST1 std::generate(vect1.begin(),vect1.end(),myRand); std::cout << "Vecteur1 généré aléatoirement :" << std::endl; - copy(vect1.begin(),vect1.end(),std::ostream_iterator(std::cout," ")); + std::copy(vect1.begin(),vect1.end(),std::ostream_iterator(std::cout," ")); std::cout<< std::endl; std::generate(vect2.begin(),vect2.end(),myRand); std::cout << "Vecteur2 généré aléatoirement :" << std::endl; - copy(vect2.begin(),vect2.end(),std::ostream_iterator(std::cout," ")); + std::copy(vect2.begin(),vect2.end(),std::ostream_iterator(std::cout," ")); std::cout<< std::endl; - std::vector::iterator InIt1=vect1.begin(),InIt2=vect2.begin(),OutIt=vect3.begin(); + + std::vector::iterator + InIt1=vect1.begin(), + InIt2=vect2.begin(), + OutIt1=vect3.begin(), + OutIt2=vect4.begin(); TimeType t = 2.4; TimeType t2 = 3.4; @@ -67,15 +76,42 @@ int main() { TimeType deltaT = t2-t1; TimeType coeff = (t2-t)/deltaT; + // Calcul avec Lambda boost::lambda::placeholder1_type _1; boost::lambda::placeholder2_type _2; - std::transform(InIt1,InIt1+dataSize3,InIt2,OutIt, ( _1 - _2 ) * coeff + _2 ); + std::transform(InIt1,InIt1+dataSize3,InIt2,OutIt1, ( _1 - _2 ) * coeff + _2 ); std::cout << "Vecteur3 calculé :" << std::endl; - copy(vect3.begin(),vect3.end(),std::ostream_iterator(std::cout," ")); + std::copy(vect3.begin(),vect3.end(),std::ostream_iterator(std::cout," ")); std::cout<< std::endl; - + + //Calcul sans Lambda + // ERREUR : il faut produire une binary pas avec compose2 + // std::transform(InIt1,InIt1+dataSize3,InIt2,OutIt2, + // //std::minus(), + // __gnu_cxx::compose2(std::minus(), + // // __gnu_cxx::identity(), + // std::bind2nd( std::multiplies(), 1. ), + // std::bind2nd( std::multiplies(), 1.1 ) ) + // ); + // InIt2 =vect2.begin(); + // OutIt2=vect4.begin(); + + // std::transform(InIt2,InIt2+dataSize3,OutIt2,OutIt2, + // std::plus() ); + + // Calcul direct + InIt1=vect1.begin(); InIt2=vect2.begin();OutIt2=vect4.begin(); + for(int i =0; i < dataSize3; ++i) { +// *OutIt2=(*InIt1 - *InIt2) * coeff + *InIt2; +// ++InIt1;++InIt2;++OutIt2; + OutIt2[i]=(InIt1[i] - InIt2[i]) * coeff + InIt2[i]; + } + + std::cout << "Vecteur4 calculé :" << std::endl; + std::copy(vect4.begin(),vect4.end(),std::ostream_iterator(std::cout," ")); + std::cout<< std::endl; }; diff --git a/src/DSC/DSC_User/Datastream/CouplingPolicy.hxx b/src/DSC/DSC_User/Datastream/CouplingPolicy.hxx index 016cceab3..0ef2b6c10 100644 --- a/src/DSC/DSC_User/Datastream/CouplingPolicy.hxx +++ b/src/DSC/DSC_User/Datastream/CouplingPolicy.hxx @@ -95,11 +95,11 @@ public: // le cas d'une demande de dataId inexistant mais encadré par deux autres template struct BoundedDataIdProcessor{ - BoundedDataIdProcessor(CouplingPolicy couplingPolicy) {}; + BoundedDataIdProcessor(const CouplingPolicy & couplingPolicy) {}; template < typename Iterator, typename DataId > void inline apply(typename iterator_t::value_type & data, - const DataId & dataId, - const Iterator & it1) { + const DataId & dataId, + const Iterator & it1) const { typedef typename iterator_t::value_type value_type; std::cout << "-------- Generic BoundedDataIdProcessor.apply() called " << std::endl; @@ -135,7 +135,7 @@ public: template < typename DataManipulator > struct DisconnectProcessor { - DisconnectProcessor(CouplingPolicy couplingPolicy) {}; + DisconnectProcessor(const CouplingPolicy & couplingPolicy) {}; template < typename Container, typename DataId > bool apply(Container & storedDatas, @@ -150,6 +150,10 @@ public: } }; + // Permet de réveiller les méthodes d'un GenericPort en attente + // depuis une CouplingPolicy + virtual void wakeupWaiting(){}; + virtual ~CouplingPolicy() {} }; diff --git a/src/DSC/DSC_User/Datastream/GenericPort.hxx b/src/DSC/DSC_User/Datastream/GenericPort.hxx index 9c63ec5de..2c01d54f5 100644 --- a/src/DSC/DSC_User/Datastream/GenericPort.hxx +++ b/src/DSC/DSC_User/Datastream/GenericPort.hxx @@ -121,12 +121,14 @@ template < typename DataManipulator, typename COUPLING_POLICY> void GenericPort::wakeupWaiting() { std::cout << "-------- wakeupWaiting ------------------" << std::endl; - if (waitingForAnyDataId || waitingForConvenientDataId) - { + storedDatas_mutex.lock(); + if (waitingForAnyDataId || waitingForConvenientDataId) { std::cout << "-------- wakeupWaiting:signal --------" << std::endl; std::cout << std::flush; cond_instance.signal(); - } + } + storedDatas_mutex.unlock(); + } /* Methode put_generique @@ -241,7 +243,7 @@ void GenericPort::put(CorbaInDataType dataPara // Par construction, les valeurs de waitingForAnyDataId, waitingForConvenientDataId et de // expectedDataId ne peuvent pas être modifiées pendant le traitement de la boucle // sur les dataIds (à cause du lock utilisé dans la méthode put et les méthodes get ) - // rem : Utilisation de l'évaluation gauche droite su logical C or + // rem : Utilisation de l'évaluation gauche droite du logical C or if ( waitingForAnyDataId || ( waitingForConvenientDataId && isDataIdConveniant(storedDatas, expectedDataId, dummy1, dummy2, dummy3) ) @@ -476,6 +478,14 @@ GenericPort::next(TimeType &t, wDataIt1 = storedDatas.end(); //Recherche le prochain dataId à renvoyer + // - lastDataIdset == true indique que lastDataId + // contient le dernier DataId renvoyé + // - lastDataIdset == false indique que l'on renverra + // le premier dataId trouvé + // - upper_bound(lastDataId) situe le prochain DataId + // à renvoyer + // Rem : les données renvoyées ne sont effacées par eraseDataIds + // si necessaire if (lastDataIdSet) wDataIt1 = storedDatas.upper_bound(lastDataId); else if ( !storedDatas.empty() ) { @@ -483,13 +493,16 @@ GenericPort::next(TimeType &t, wDataIt1 = storedDatas.begin(); } + typename COUPLING_POLICY::template DisconnectProcessor processDisconnect(*this); + while ( storedDatas.empty() || wDataIt1 == storedDatas.end() ) { // Délègue au mode de couplage la gestion d'une demande de donnée non disponible // si le port est deconnecté - typename COUPLING_POLICY::template DisconnectProcessor processDisconnect(*this); - if ( processDisconnect.apply(storedDatas, lastDataId, wDataIt1) ) break; - + if ( processDisconnect.apply(storedDatas, lastDataId, wDataIt1) ) { + waitingForAnyDataId = false; break; + } + std::cout << "-------- Next : MARK 2 ------------------" << std::endl; //Positionné à faux dans la méthode put waitingForAnyDataId = true; @@ -498,11 +511,11 @@ GenericPort::next(TimeType &t, std::cout << "-------- Next : waiting datas ------------------" << std::endl; fflush(stdout);fflush(stderr); cond_instance.wait(); - + if (lastDataIdSet) { std::cout << "-------- Next : MARK 4 ------------------" << std::endl; wDataIt1 = storedDatas.upper_bound(lastDataId); - } else { + } else { std::cout << "-------- Next : MARK 5 ------------------" << std::endl; lastDataIdSet = true; wDataIt1 = storedDatas.begin(); @@ -523,6 +536,8 @@ GenericPort::next(TimeType &t, std::cout << "-------- Next : MARK 8 ------------------" << std::endl; } catch (...) { + std::cout << "-------- Next : MARK 8bis ------------------" << std::endl; + waitingForAnyDataId = false; storedDatas_mutex.unlock(); throw; } -- 2.39.2