logic: Implement Recovery mechanism

This commit implements the recovery mechanism that is critical for
handling network partitions.  When a node receives a sync interest with
unrecognizable digest, it goes into recovery.

Change-Id: I205687b9791b286cf6eca4c0159b49f744b38bed
Refs: #3929
This commit is contained in:
Sonu Mishra
2017-01-18 20:27:51 -08:00
committed by Alexander Afanasyev
parent e10acbc260
commit 4d3a2e09ff
4 changed files with 218 additions and 25 deletions
+1
View File
@@ -5,3 +5,4 @@ ChronoSync authors
- Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- Chaoyi Bian <bcy@pku.edu.cn>
- Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
+72 -9
View File
@@ -58,9 +58,11 @@ const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::mil
const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
const ndn::name::Component Logic::RESET_COMPONENT("reset");
const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Logic::Logic(ndn::Face& face,
const Name& syncPrefix,
@@ -72,7 +74,8 @@ Logic::Logic(ndn::Face& face,
const time::steady_clock::Duration& cancelResetTimer,
const time::milliseconds& resetInterestLifetime,
const time::milliseconds& syncInterestLifetime,
const time::milliseconds& syncReplyFreshness)
const time::milliseconds& syncReplyFreshness,
const time::milliseconds& recoveryInterestLifetime)
: m_face(face)
, m_syncPrefix(syncPrefix)
, m_defaultUserPrefix(defaultUserPrefix)
@@ -90,6 +93,7 @@ Logic::Logic(ndn::Face& face,
, m_resetInterestLifetime(resetInterestLifetime)
, m_syncInterestLifetime(syncInterestLifetime)
, m_syncReplyFreshness(syncReplyFreshness)
, m_recoveryInterestLifetime(recoveryInterestLifetime)
, m_defaultSigningId(defaultSigningId)
, m_validator(validator)
{
@@ -316,13 +320,15 @@ Logic::onSyncInterest(const Name& prefix, const Interest& interest)
_LOG_DEBUG_ID("InterestName: " << name);
if (RESET_COMPONENT != name.get(-1)) {
// normal sync interest
if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
processResetInterest(interest);
}
else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
processRecoveryInterest(interest);
}
else {
processSyncInterest(interest.shared_from_this());
}
else
// reset interest
processResetInterest(interest);
_LOG_DEBUG_ID("<< Logic::onSyncInterest");
}
@@ -451,9 +457,9 @@ Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
}
else {
// OK, nobody is helping us, just tell the truth.
_LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
_LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
m_interestTable.erase(digest);
sendSyncData(m_defaultUserPrefix, name, m_state);
sendRecoveryInterest(digest);
}
_LOG_DEBUG_ID("<< Logic::processSyncInterest");
@@ -687,4 +693,61 @@ Logic::printDigest(ndn::ConstBufferPtr digest)
_LOG_DEBUG_ID("Hash: " << hash);
}
} // namespace chronosync
void
Logic::sendRecoveryInterest(ndn::ConstBufferPtr digest)
{
_LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
Name interestName;
interestName.append(m_syncPrefix)
.append(RECOVERY_COMPONENT)
.append(ndn::name::Component(*digest));
Interest interest(interestName);
interest.setMustBeFresh(true);
interest.setInterestLifetime(m_recoveryInterestLifetime);
m_face.expressInterest(interest, bind(&Logic::onRecoveryData, this, _1, _2),
bind(&Logic::onRecoveryTimeout, this, _1));
_LOG_DEBUG_ID("interest: " << interest.getName());
_LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
}
void
Logic::processRecoveryInterest(const Interest& interest)
{
_LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
const Name& name = interest.getName();
ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
ConstBufferPtr rootDigest = m_state.getRootDigest();
DiffStateContainer::iterator stateIter = m_log.find(digest);
if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
_LOG_DEBUG_ID("I can help you recover");
sendSyncData(m_defaultUserPrefix, name, m_state);
return;
}
_LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
}
void
Logic::onRecoveryData(const Interest& interest, Data& data)
{
_LOG_DEBUG_ID(">> Logic::onRecoveryData");
onSyncDataValidated(data.shared_from_this());
_LOG_DEBUG_ID("<< Logic::onRecoveryData");
}
void
Logic::onRecoveryTimeout(const Interest& interest)
{
_LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
_LOG_DEBUG_ID("Interest: " << interest.getName());
_LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
}
} // namespace chronosync
+52 -4
View File
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2014 University of California, Los Angeles
* Copyright (c) 2012-2017 University of California, Los Angeles
*
* This file is part of ChronoSync, synchronization library for distributed realtime
* applications for NDN.
@@ -20,6 +20,7 @@
* @author Chaoyi Bian <bcy@pku.edu.cn>
* @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
* @author Yingdi Yu <yingdi@cs.ucla.edu>
* @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
*/
#ifndef CHRONOSYNC_LOGIC_HPP
@@ -48,7 +49,8 @@ namespace chronosync {
* Instances of this class is usually used as elements of some containers
* such as std::vector, thus it is copyable.
*/
class NodeInfo {
class NodeInfo
{
public:
Name userPrefix;
Name signingId;
@@ -97,6 +99,7 @@ public:
static const time::milliseconds DEFAULT_RESET_INTEREST_LIFETIME;
static const time::milliseconds DEFAULT_SYNC_INTEREST_LIFETIME;
static const time::milliseconds DEFAULT_SYNC_REPLY_FRESHNESS;
static const time::milliseconds DEFAULT_RECOVERY_INTEREST_LIFETIME;
/**
* @brief Constructor
@@ -123,7 +126,8 @@ public:
const time::steady_clock::Duration& cancelResetTimer = DEFAULT_CANCEL_RESET_TIMER,
const time::milliseconds& resetInterestLifetime = DEFAULT_RESET_INTEREST_LIFETIME,
const time::milliseconds& syncInterestLifetime = DEFAULT_SYNC_INTEREST_LIFETIME,
const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS);
const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS,
const time::milliseconds& recoveryInterestLifetime = DEFAULT_RECOVERY_INTEREST_LIFETIME);
~Logic();
@@ -387,6 +391,47 @@ private:
void
printDigest(ndn::ConstBufferPtr digest);
/**
* @brief Helper method to send Recovery Interest
*
* @param digest The digest to be included in the recovery interest
*/
void
sendRecoveryInterest(ndn::ConstBufferPtr digest);
/**
* @brief Process Recovery Interest
*
* This method extracts the digest from the incoming Recovery Interest.
* If it recognizes this incoming digest, then it sends its full state
* as reply.
*
* @param interest The incoming interest
*/
void
processRecoveryInterest(const Interest& interest);
/**
* @brief Callback to handle Recovery Reply
*
* This method calls Logic::onSyncDataValidated directly.
*
* @param interest The Recovery Interest
* @param data The reply to the Recovery Interest
*/
void
onRecoveryData(const Interest& interest, Data& data);
/**
* @brief Callback to handle Recovery Interest timeout.
*
* This method does nothing.
*
* @param interest The Recovery Interest
*/
void
onRecoveryTimeout(const Interest& interest);
public:
static const ndn::Name DEFAULT_NAME;
static const ndn::Name EMPTY_NAME;
@@ -397,6 +442,7 @@ private:
static const ndn::ConstBufferPtr EMPTY_DIGEST;
static const ndn::name::Component RESET_COMPONENT;
static const ndn::name::Component RECOVERY_COMPONENT;
// Communication
ndn::Face& m_face;
@@ -438,6 +484,8 @@ private:
time::milliseconds m_syncInterestLifetime;
/// @brief FreshnessPeriod of SyncReply
time::milliseconds m_syncReplyFreshness;
/// @brief Lifetime of recovery interest
time::milliseconds m_recoveryInterestLifetime;
// Security
ndn::Name m_defaultSigningId;
@@ -454,4 +502,4 @@ private:
} // namespace chronosync
#endif // CHRONOSYNC_LOGIC_HPP
#endif // CHRONOSYNC_LOGIC_HPP
+93 -12
View File
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2016 University of California, Los Angeles
* Copyright (c) 2012-2017 University of California, Los Angeles
*
* This file is part of ChronoSync, synchronization library for distributed realtime
* applications for NDN.
@@ -76,12 +76,14 @@ public:
userPrefix[0] = Name("/user0");
userPrefix[1] = Name("/user1");
userPrefix[2] = Name("/user2");
userPrefix[3] = Name("/user3");
faces[0].reset(new DummyClientFace(io, {true, true}));
faces[1].reset(new DummyClientFace(io, {true, true}));
faces[2].reset(new DummyClientFace(io, {true, true}));
faces[3].reset(new DummyClientFace(io, {true, true}));
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 4; i++) {
readInterestOffset[i] = 0;
readDataOffset[i] = 0;
}
@@ -90,7 +92,7 @@ public:
void
passPacket()
{
for (int i = 0; i < 3; i++)
for (int i = 0; i < 4; i++)
checkFace(i);
}
@@ -98,14 +100,14 @@ public:
checkFace(int sender)
{
while (faces[sender]->sentInterests.size() > readInterestOffset[sender]) {
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 4; i++) {
if (sender != i)
faces[i]->receive(faces[sender]->sentInterests[readInterestOffset[sender]]);
}
readInterestOffset[sender]++;
}
while (faces[sender]->sentData.size() > readDataOffset[sender]) {
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 4; i++) {
if (sender != i)
faces[i]->receive(faces[sender]->sentData[readDataOffset[sender]]);
}
@@ -115,13 +117,13 @@ public:
public:
Name syncPrefix;
Name userPrefix[3];
Name userPrefix[4];
std::unique_ptr<DummyClientFace> faces[3];
shared_ptr<Handler> handler[3];
std::unique_ptr<DummyClientFace> faces[4];
shared_ptr<Handler> handler[4];
size_t readInterestOffset[3];
size_t readDataOffset[3];
size_t readInterestOffset[4];
size_t readDataOffset[4];
};
BOOST_FIXTURE_TEST_SUITE(LogicTests, LogicFixture)
@@ -245,8 +247,8 @@ BOOST_AUTO_TEST_CASE(ResetRecover)
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1);
BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 2);
BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1);
BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2);
handler[2]->updateSeqNo(4);
@@ -291,6 +293,85 @@ BOOST_AUTO_TEST_CASE(RecoverConflict)
BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2);
}
BOOST_AUTO_TEST_CASE(PartitionRecover)
{
handler[0] = make_shared<Handler>(ref(*faces[0]), syncPrefix, userPrefix[0]);
advanceClocks(ndn::time::milliseconds(10), 10);
handler[1] = make_shared<Handler>(ref(*faces[1]), syncPrefix, userPrefix[1]);
advanceClocks(ndn::time::milliseconds(10), 10);
handler[2] = make_shared<Handler>(ref(*faces[2]), syncPrefix, userPrefix[2]);
advanceClocks(ndn::time::milliseconds(10), 10);
handler[3] = make_shared<Handler>(ref(*faces[3]), syncPrefix, userPrefix[3]);
advanceClocks(ndn::time::milliseconds(10), 30);
handler[0]->updateSeqNo(1);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1);
BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1);
BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 1);
handler[2]->updateSeqNo(2);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 2);
BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 2);
BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 2);
// Network Partition start
handler[1]->updateSeqNo(3);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 3);
handler[2]->map[handler[1]->logic.getSessionName()] = 0;
handler[3]->map[handler[1]->logic.getSessionName()] = 0;
handler[3]->updateSeqNo(4);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[2]->map[handler[3]->logic.getSessionName()], 4);
handler[0]->map[handler[3]->logic.getSessionName()] = 0;
handler[1]->map[handler[3]->logic.getSessionName()] = 0;
// Network partition over
handler[0]->updateSeqNo(5);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 5);
BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 5);
BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 5);
handler[2]->updateSeqNo(6);
for (int i = 0; i < 50; i++) {
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 6);
BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 6);
BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 6);
}
BOOST_AUTO_TEST_CASE(MultipleUserUnderOneLogic)
{
handler[0] = make_shared<Handler>(ref(*faces[0]), syncPrefix, userPrefix[0]);