Start initialization phase on application nack
PartialProducer: append sequence number in hello data refs: #4693 Change-Id: I56ac3e331a92b9a79d3bf360571df2914a5374aa
This commit is contained in:
+28
-4
@@ -92,10 +92,34 @@ Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
|
||||
NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
|
||||
|
||||
State state(data.getContent());
|
||||
std::vector<MissingDataInfo> updates;
|
||||
std::vector<ndn::Name> availableSubscriptions;
|
||||
|
||||
NDN_LOG_DEBUG("Content: " << state);
|
||||
for (const auto& content : state.getContent()) {
|
||||
ndn::Name prefix = content.getPrefix(-1);
|
||||
uint64_t seq = content.get(content.size()-1).toNumber();
|
||||
if (m_prefixes.find(prefix) == m_prefixes.end()) {
|
||||
// In case this the first prefix ever received via hello data,
|
||||
// add it to the available subscriptions
|
||||
availableSubscriptions.push_back(prefix);
|
||||
}
|
||||
else if (seq > m_prefixes[prefix]) {
|
||||
// Else this is not the first time we have seen this prefix,
|
||||
// we must let application know that there is missing data
|
||||
// (scenario: application nack triggers another hello interest)
|
||||
updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
|
||||
m_prefixes[prefix] = seq;
|
||||
}
|
||||
}
|
||||
|
||||
m_onReceiveHelloData(state.getContent());
|
||||
NDN_LOG_DEBUG("Hello Data: " << state);
|
||||
|
||||
m_onReceiveHelloData(availableSubscriptions);
|
||||
|
||||
if (!updates.empty()) {
|
||||
NDN_LOG_DEBUG("Updating application with missed updates");
|
||||
m_onUpdate(updates);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -140,8 +164,8 @@ Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
|
||||
m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
|
||||
|
||||
if (data.getContentType() == ndn::tlv::ContentType_Nack) {
|
||||
NDN_LOG_DEBUG("Received application Nack from producer, renew sync interest");
|
||||
sendSyncInterest();
|
||||
NDN_LOG_DEBUG("Received application Nack from producer, send hello again");
|
||||
sendHelloInterest();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
+8
-1
@@ -46,11 +46,17 @@ const ndn::time::milliseconds SYNC_INTEREST_LIFETIME = 1_s;
|
||||
* @brief Consumer logic to subscribe to producer's data
|
||||
*
|
||||
* Application needs to call sendHelloInterest to get the subscription list
|
||||
* in ReceiveHelloCallback. It can then add the desired names using addSubscription.
|
||||
* in psync::ReceiveHelloCallback. It can then add the desired names using addSubscription.
|
||||
* Finally application will call sendSyncInterest. If the application adds something
|
||||
* later to the subscription list then it may call sendSyncInterest again for
|
||||
* sending the next sync interest with updated IBF immediately to reduce any delay in sync data.
|
||||
* Whenever there is new data UpdateCallback will be called to notify the application.
|
||||
*
|
||||
* If consumer wakes up after a long time to sync, producer may not decode the differences
|
||||
* with its old IBF successfully and send an application nack. Upon receiving the nack,
|
||||
* consumer will send a hello again and inform the application via psync::ReceiveHelloCallback
|
||||
* and psync::UpdateCallback.
|
||||
*
|
||||
* Currently, fetching of the data needs to be handled by the application.
|
||||
*/
|
||||
class Consumer
|
||||
@@ -97,6 +103,7 @@ public:
|
||||
* @brief Add prefix to subscription list
|
||||
*
|
||||
* @param prefix prefix to be added to the list
|
||||
* @return true if prefix is added, false if it is already present
|
||||
*/
|
||||
bool
|
||||
addSubscription(const ndn::Name& prefix);
|
||||
|
||||
@@ -79,8 +79,8 @@ PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& i
|
||||
|
||||
State state;
|
||||
|
||||
for (const auto& p : m_prefixes) {
|
||||
state.addContent(p.first);
|
||||
for (const auto& prefix : m_prefixes) {
|
||||
state.addContent(ndn::Name(prefix.first).appendNumber(prefix.second));
|
||||
}
|
||||
NDN_LOG_DEBUG("sending content p: " << state);
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(Constructor)
|
||||
util::DummyClientFace face({true, true});
|
||||
BOOST_REQUIRE_NO_THROW(Consumer(Name("/psync"),
|
||||
face,
|
||||
[] (const vector<Name>& availableSubs) {},
|
||||
[] (const vector<MissingDataInfo>) {},
|
||||
[] (const vector<Name>&) {},
|
||||
[] (const vector<MissingDataInfo>&) {},
|
||||
40,
|
||||
0.001));
|
||||
}
|
||||
@@ -47,8 +47,8 @@ BOOST_AUTO_TEST_CASE(AddSubscription)
|
||||
{
|
||||
util::DummyClientFace face({true, true});
|
||||
Consumer consumer(Name("/psync"), face,
|
||||
[] (const vector<Name>& availableSubs) {},
|
||||
[] (const vector<MissingDataInfo>) {},
|
||||
[] (const vector<Name>&) {},
|
||||
[] (const vector<MissingDataInfo>&) {},
|
||||
40, 0.001);
|
||||
|
||||
Name subscription("test");
|
||||
|
||||
@@ -327,8 +327,9 @@ BOOST_AUTO_TEST_CASE(ApplicationNack)
|
||||
// Next sync interest should trigger the nack
|
||||
advanceClocks(ndn::time::milliseconds(15), 100);
|
||||
|
||||
// Nack does not contain any content so still should be 1
|
||||
BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
|
||||
// Application should have been notified that new data is available
|
||||
// from the hello itself.
|
||||
BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
|
||||
|
||||
bool nackRcvd = false;
|
||||
for (const auto& data : face.sentData) {
|
||||
@@ -339,9 +340,9 @@ BOOST_AUTO_TEST_CASE(ApplicationNack)
|
||||
}
|
||||
BOOST_CHECK(nackRcvd);
|
||||
|
||||
producer->publishName("testUser-4");
|
||||
publishUpdateFor("testUser-4");
|
||||
advanceClocks(ndn::time::milliseconds(10));
|
||||
BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
|
||||
BOOST_CHECK_EQUAL(numSyncDataRcvd, 3);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
Reference in New Issue
Block a user