From 4d3a2e09ffc19f5ff2064c74f2bb1e8faf2622f3 Mon Sep 17 00:00:00 2001 From: Sonu Mishra Date: Wed, 18 Jan 2017 20:27:51 -0800 Subject: [PATCH] logic: Implement Recovery mechanism This commit implements the recovery mechanism that is critical for handling network partitions. When a node receives a sync interest with unrecognizable digest, it goes into recovery. Change-Id: I205687b9791b286cf6eca4c0159b49f744b38bed Refs: #3929 --- AUTHORS.md | 1 + src/logic.cpp | 81 +++++++++++++++++++++--- src/logic.hpp | 56 +++++++++++++++-- tests/unit-tests/test-logic.cpp | 105 ++++++++++++++++++++++++++++---- 4 files changed, 218 insertions(+), 25 deletions(-) diff --git a/AUTHORS.md b/AUTHORS.md index 58fac9b..88f6905 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -5,3 +5,4 @@ ChronoSync authors - Alexander Afanasyev - Chaoyi Bian - Yingdi Yu +- Sonu Mishra diff --git a/src/logic.cpp b/src/logic.cpp index a110328..aa08acc 100644 --- a/src/logic.cpp +++ b/src/logic.cpp @@ -58,9 +58,11 @@ const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::mil const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000); const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000); const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000); +const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000); const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32)); const ndn::name::Component Logic::RESET_COMPONENT("reset"); +const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery"); Logic::Logic(ndn::Face& face, const Name& syncPrefix, @@ -72,7 +74,8 @@ Logic::Logic(ndn::Face& face, const time::steady_clock::Duration& cancelResetTimer, const time::milliseconds& resetInterestLifetime, const time::milliseconds& syncInterestLifetime, - const time::milliseconds& syncReplyFreshness) + const time::milliseconds& syncReplyFreshness, + const time::milliseconds& recoveryInterestLifetime) : m_face(face) , m_syncPrefix(syncPrefix) , m_defaultUserPrefix(defaultUserPrefix) @@ -90,6 +93,7 @@ Logic::Logic(ndn::Face& face, , m_resetInterestLifetime(resetInterestLifetime) , m_syncInterestLifetime(syncInterestLifetime) , m_syncReplyFreshness(syncReplyFreshness) + , m_recoveryInterestLifetime(recoveryInterestLifetime) , m_defaultSigningId(defaultSigningId) , m_validator(validator) { @@ -316,13 +320,15 @@ Logic::onSyncInterest(const Name& prefix, const Interest& interest) _LOG_DEBUG_ID("InterestName: " << name); - if (RESET_COMPONENT != name.get(-1)) { - // normal sync interest + if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) { + processResetInterest(interest); + } + else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) { + processRecoveryInterest(interest); + } + else { processSyncInterest(interest.shared_from_this()); } - else - // reset interest - processResetInterest(interest); _LOG_DEBUG_ID("<< Logic::onSyncInterest"); } @@ -451,9 +457,9 @@ Logic::processSyncInterest(const shared_ptr& interest, } else { // OK, nobody is helping us, just tell the truth. - _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth"); + _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover"); m_interestTable.erase(digest); - sendSyncData(m_defaultUserPrefix, name, m_state); + sendRecoveryInterest(digest); } _LOG_DEBUG_ID("<< Logic::processSyncInterest"); @@ -687,4 +693,61 @@ Logic::printDigest(ndn::ConstBufferPtr digest) _LOG_DEBUG_ID("Hash: " << hash); } -} // namespace chronosync +void +Logic::sendRecoveryInterest(ndn::ConstBufferPtr digest) +{ + _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest"); + + Name interestName; + interestName.append(m_syncPrefix) + .append(RECOVERY_COMPONENT) + .append(ndn::name::Component(*digest)); + + Interest interest(interestName); + interest.setMustBeFresh(true); + interest.setInterestLifetime(m_recoveryInterestLifetime); + + m_face.expressInterest(interest, bind(&Logic::onRecoveryData, this, _1, _2), + bind(&Logic::onRecoveryTimeout, this, _1)); + + _LOG_DEBUG_ID("interest: " << interest.getName()); + _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest"); +} + +void +Logic::processRecoveryInterest(const Interest& interest) +{ + _LOG_DEBUG_ID(">> Logic::processRecoveryInterest"); + + const Name& name = interest.getName(); + ConstBufferPtr digest = make_shared(name.get(-1).value(), name.get(-1).value_size()); + + ConstBufferPtr rootDigest = m_state.getRootDigest(); + + DiffStateContainer::iterator stateIter = m_log.find(digest); + + if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) { + _LOG_DEBUG_ID("I can help you recover"); + sendSyncData(m_defaultUserPrefix, name, m_state); + return; + } + _LOG_DEBUG_ID("<< Logic::processRecoveryInterest"); +} + +void +Logic::onRecoveryData(const Interest& interest, Data& data) +{ + _LOG_DEBUG_ID(">> Logic::onRecoveryData"); + onSyncDataValidated(data.shared_from_this()); + _LOG_DEBUG_ID("<< Logic::onRecoveryData"); +} + +void +Logic::onRecoveryTimeout(const Interest& interest) +{ + _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout"); + _LOG_DEBUG_ID("Interest: " << interest.getName()); + _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout"); +} + +} // namespace chronosync \ No newline at end of file diff --git a/src/logic.hpp b/src/logic.hpp index d4a811f..c684fa2 100644 --- a/src/logic.hpp +++ b/src/logic.hpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2014 University of California, Los Angeles + * Copyright (c) 2012-2017 University of California, Los Angeles * * This file is part of ChronoSync, synchronization library for distributed realtime * applications for NDN. @@ -20,6 +20,7 @@ * @author Chaoyi Bian * @author Alexander Afanasyev * @author Yingdi Yu + * @author Sonu Mishra */ #ifndef CHRONOSYNC_LOGIC_HPP @@ -48,7 +49,8 @@ namespace chronosync { * Instances of this class is usually used as elements of some containers * such as std::vector, thus it is copyable. */ -class NodeInfo { +class NodeInfo +{ public: Name userPrefix; Name signingId; @@ -97,6 +99,7 @@ public: static const time::milliseconds DEFAULT_RESET_INTEREST_LIFETIME; static const time::milliseconds DEFAULT_SYNC_INTEREST_LIFETIME; static const time::milliseconds DEFAULT_SYNC_REPLY_FRESHNESS; + static const time::milliseconds DEFAULT_RECOVERY_INTEREST_LIFETIME; /** * @brief Constructor @@ -123,7 +126,8 @@ public: const time::steady_clock::Duration& cancelResetTimer = DEFAULT_CANCEL_RESET_TIMER, const time::milliseconds& resetInterestLifetime = DEFAULT_RESET_INTEREST_LIFETIME, const time::milliseconds& syncInterestLifetime = DEFAULT_SYNC_INTEREST_LIFETIME, - const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS); + const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS, + const time::milliseconds& recoveryInterestLifetime = DEFAULT_RECOVERY_INTEREST_LIFETIME); ~Logic(); @@ -387,6 +391,47 @@ private: void printDigest(ndn::ConstBufferPtr digest); + /** + * @brief Helper method to send Recovery Interest + * + * @param digest The digest to be included in the recovery interest + */ + void + sendRecoveryInterest(ndn::ConstBufferPtr digest); + + /** + * @brief Process Recovery Interest + * + * This method extracts the digest from the incoming Recovery Interest. + * If it recognizes this incoming digest, then it sends its full state + * as reply. + * + * @param interest The incoming interest + */ + void + processRecoveryInterest(const Interest& interest); + + /** + * @brief Callback to handle Recovery Reply + * + * This method calls Logic::onSyncDataValidated directly. + * + * @param interest The Recovery Interest + * @param data The reply to the Recovery Interest + */ + void + onRecoveryData(const Interest& interest, Data& data); + + /** + * @brief Callback to handle Recovery Interest timeout. + * + * This method does nothing. + * + * @param interest The Recovery Interest + */ + void + onRecoveryTimeout(const Interest& interest); + public: static const ndn::Name DEFAULT_NAME; static const ndn::Name EMPTY_NAME; @@ -397,6 +442,7 @@ private: static const ndn::ConstBufferPtr EMPTY_DIGEST; static const ndn::name::Component RESET_COMPONENT; + static const ndn::name::Component RECOVERY_COMPONENT; // Communication ndn::Face& m_face; @@ -438,6 +484,8 @@ private: time::milliseconds m_syncInterestLifetime; /// @brief FreshnessPeriod of SyncReply time::milliseconds m_syncReplyFreshness; + /// @brief Lifetime of recovery interest + time::milliseconds m_recoveryInterestLifetime; // Security ndn::Name m_defaultSigningId; @@ -454,4 +502,4 @@ private: } // namespace chronosync -#endif // CHRONOSYNC_LOGIC_HPP +#endif // CHRONOSYNC_LOGIC_HPP \ No newline at end of file diff --git a/tests/unit-tests/test-logic.cpp b/tests/unit-tests/test-logic.cpp index ed860af..342af4c 100644 --- a/tests/unit-tests/test-logic.cpp +++ b/tests/unit-tests/test-logic.cpp @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2012-2016 University of California, Los Angeles + * Copyright (c) 2012-2017 University of California, Los Angeles * * This file is part of ChronoSync, synchronization library for distributed realtime * applications for NDN. @@ -76,12 +76,14 @@ public: userPrefix[0] = Name("/user0"); userPrefix[1] = Name("/user1"); userPrefix[2] = Name("/user2"); + userPrefix[3] = Name("/user3"); faces[0].reset(new DummyClientFace(io, {true, true})); faces[1].reset(new DummyClientFace(io, {true, true})); faces[2].reset(new DummyClientFace(io, {true, true})); + faces[3].reset(new DummyClientFace(io, {true, true})); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { readInterestOffset[i] = 0; readDataOffset[i] = 0; } @@ -90,7 +92,7 @@ public: void passPacket() { - for (int i = 0; i < 3; i++) + for (int i = 0; i < 4; i++) checkFace(i); } @@ -98,14 +100,14 @@ public: checkFace(int sender) { while (faces[sender]->sentInterests.size() > readInterestOffset[sender]) { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { if (sender != i) faces[i]->receive(faces[sender]->sentInterests[readInterestOffset[sender]]); } readInterestOffset[sender]++; } while (faces[sender]->sentData.size() > readDataOffset[sender]) { - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 4; i++) { if (sender != i) faces[i]->receive(faces[sender]->sentData[readDataOffset[sender]]); } @@ -115,13 +117,13 @@ public: public: Name syncPrefix; - Name userPrefix[3]; + Name userPrefix[4]; - std::unique_ptr faces[3]; - shared_ptr handler[3]; + std::unique_ptr faces[4]; + shared_ptr handler[4]; - size_t readInterestOffset[3]; - size_t readDataOffset[3]; + size_t readInterestOffset[4]; + size_t readDataOffset[4]; }; BOOST_FIXTURE_TEST_SUITE(LogicTests, LogicFixture) @@ -245,8 +247,8 @@ BOOST_AUTO_TEST_CASE(ResetRecover) advanceClocks(ndn::time::milliseconds(2), 10); passPacket(); } - BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1); - BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 2); + BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1); + BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2); handler[2]->updateSeqNo(4); @@ -291,6 +293,85 @@ BOOST_AUTO_TEST_CASE(RecoverConflict) BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2); } +BOOST_AUTO_TEST_CASE(PartitionRecover) +{ + handler[0] = make_shared(ref(*faces[0]), syncPrefix, userPrefix[0]); + advanceClocks(ndn::time::milliseconds(10), 10); + + handler[1] = make_shared(ref(*faces[1]), syncPrefix, userPrefix[1]); + advanceClocks(ndn::time::milliseconds(10), 10); + + handler[2] = make_shared(ref(*faces[2]), syncPrefix, userPrefix[2]); + advanceClocks(ndn::time::milliseconds(10), 10); + + handler[3] = make_shared(ref(*faces[3]), syncPrefix, userPrefix[3]); + advanceClocks(ndn::time::milliseconds(10), 30); + + handler[0]->updateSeqNo(1); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1); + BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1); + BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 1); + + handler[2]->updateSeqNo(2); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 2); + BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 2); + BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 2); + + // Network Partition start + + handler[1]->updateSeqNo(3); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 3); + handler[2]->map[handler[1]->logic.getSessionName()] = 0; + handler[3]->map[handler[1]->logic.getSessionName()] = 0; + + handler[3]->updateSeqNo(4); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[2]->map[handler[3]->logic.getSessionName()], 4); + handler[0]->map[handler[3]->logic.getSessionName()] = 0; + handler[1]->map[handler[3]->logic.getSessionName()] = 0; + + // Network partition over + + handler[0]->updateSeqNo(5); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 5); + BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 5); + BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 5); + + handler[2]->updateSeqNo(6); + + for (int i = 0; i < 50; i++) { + advanceClocks(ndn::time::milliseconds(2), 10); + passPacket(); + } + BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 6); + BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 6); + BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 6); +} + BOOST_AUTO_TEST_CASE(MultipleUserUnderOneLogic) { handler[0] = make_shared(ref(*faces[0]), syncPrefix, userPrefix[0]);