/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* * Copyright (c) 2014-2020, The University of Memphis * * This file is part of PSync. * See AUTHORS.md for complete list of PSync authors and contributors. * * PSync is free software: you can redistribute it and/or modify it under the terms * of the GNU Lesser General Public License as published by the Free Software Foundation, * either version 3 of the License, or (at your option) any later version. * * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE. See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along with * PSync, e.g., in COPYING.md file. If not, see . */ #include "PSync/partial-producer.hpp" #include "PSync/consumer.hpp" #include "tests/boost-test.hpp" #include "tests/io-fixture.hpp" #include #include namespace psync { using namespace ndn; class PartialSyncFixture : public tests::IoFixture { public: PartialSyncFixture() { producer = make_shared(40, face, syncPrefix, userPrefix); addUserNodes("testUser", 10); } ~PartialSyncFixture() override { for (const auto& consumer : consumers) { if (consumer) { consumer->stop(); } } } void addConsumer(int id, const std::vector& subscribeTo, bool linkToProducer = true) { consumerFaces[id] = std::make_shared(m_io, util::DummyClientFace::Options{true, true}); if (linkToProducer) { face.linkTo(*consumerFaces[id]); } consumers[id] = std::make_shared(syncPrefix, *consumerFaces[id], [&, id] (const auto& availableSubs) { numHelloDataRcvd++; BOOST_CHECK(checkSubList(availableSubs)); checkIBFUpdated(id); for (const auto& sub : subscribeTo) { auto it = availableSubs.find(sub); consumers[id]->addSubscription(sub, it->second); } consumers[id]->sendSyncInterest(); }, [&, id] (const auto& updates) { numSyncDataRcvd++; checkIBFUpdated(id); for (const auto& update : updates) { BOOST_CHECK(consumers[id]->isSubscribed(update.prefix)); BOOST_CHECK_EQUAL(oldSeqMap.at(update.prefix) + 1, update.lowSeq); BOOST_CHECK_EQUAL(producer->m_prefixes.at(update.prefix), update.highSeq); BOOST_CHECK_EQUAL(consumers[id]->getSeqNo(update.prefix).value(), update.highSeq); } }, 40, 0.001); advanceClocks(ndn::time::milliseconds(10)); } void checkIBFUpdated(int id) { Name emptyName; producer->m_iblt.appendToName(emptyName); BOOST_CHECK_EQUAL(consumers[id]->m_iblt, emptyName); } bool checkSubList(const std::map& availableSubs) const { for (const auto& prefix : producer->m_prefixes) { auto it = availableSubs.find(prefix.first); if (it == availableSubs.end()) { return false; } } return true; } void addUserNodes(const std::string& prefix, int numOfUserNodes) { // zeroth is added through constructor for (int i = 1; i < numOfUserNodes; i++) { producer->addUserNode(prefix + "-" + to_string(i)); } } void publishUpdateFor(const std::string& prefix) { oldSeqMap = producer->m_prefixes; producer->publishName(prefix); advanceClocks(ndn::time::milliseconds(10)); } void updateSeqFor(const std::string& prefix, uint64_t seq) { oldSeqMap = producer->m_prefixes; producer->updateSeqNo(prefix, seq); } util::DummyClientFace face{m_io, {true, true}}; Name syncPrefix{"psync"}; Name userPrefix{"testUser-0"}; shared_ptr producer; std::map oldSeqMap; shared_ptr consumers[3]; shared_ptr consumerFaces[3]; int numHelloDataRcvd = 0; int numSyncDataRcvd = 0; }; BOOST_FIXTURE_TEST_SUITE(TestPartialSync, PartialSyncFixture) BOOST_AUTO_TEST_CASE(Simple) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); publishUpdateFor("testUser-3"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); } BOOST_AUTO_TEST_CASE(MissedUpdate) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); updateSeqFor("testUser-2", 3); BOOST_CHECK_EQUAL(numSyncDataRcvd, 0); // The sync interest sent after hello will timeout advanceClocks(ndn::time::milliseconds(999)); BOOST_CHECK_EQUAL(numSyncDataRcvd, 0); // Next sync interest will bring back the sync data advanceClocks(ndn::time::milliseconds(1)); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); } BOOST_AUTO_TEST_CASE(LateSubscription) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); consumers[0]->addSubscription("testUser-3", 0); consumers[0]->sendSyncInterest(); publishUpdateFor("testUser-3"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); } BOOST_AUTO_TEST_CASE(ConsumerSyncTimeout) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 0); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 1); advanceClocks(ndn::time::milliseconds(10), 100); // The next Interest is sent after the first one immediately BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 1); advanceClocks(ndn::time::milliseconds(10), 100); int numSyncInterests = 0; for (const auto& interest : consumerFaces[0]->sentInterests) { if (interest.getName().getSubName(0, 2) == Name("/psync/sync")) { numSyncInterests++; } } BOOST_CHECK_EQUAL(numSyncInterests, 3); BOOST_CHECK_EQUAL(numSyncDataRcvd, 0); } BOOST_AUTO_TEST_CASE(MultipleConsumersWithSameSubList) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); addConsumer(1, subscribeTo); addConsumer(2, subscribeTo); consumers[0]->sendHelloInterest(); consumers[1]->sendHelloInterest(); consumers[2]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 3); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 3); publishUpdateFor("testUser-3"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 3); } BOOST_AUTO_TEST_CASE(MultipleConsumersWithDifferentSubList) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); std::vector subscribeTo1{"testUser-1", "testUser-3", "testUser-5"}; addConsumer(1, subscribeTo1); std::vector subscribeTo2{"testUser-2", "testUser-3"}; addConsumer(2, subscribeTo2); consumers[0]->sendHelloInterest(); consumers[1]->sendHelloInterest(); consumers[2]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 3); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); numSyncDataRcvd = 0; publishUpdateFor("testUser-3"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); } BOOST_AUTO_TEST_CASE(ReplicatedProducer) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); // Link to first producer goes down face.unlink(); util::DummyClientFace face2(m_io, {true, true}); PartialProducer replicatedProducer(40, face2, syncPrefix, userPrefix); for (int i = 1; i < 10; i++) { replicatedProducer.addUserNode("testUser-" + to_string(i)); } advanceClocks(ndn::time::milliseconds(10)); replicatedProducer.publishName("testUser-2"); // Link to a replicated producer comes up face2.linkTo(*consumerFaces[0]); BOOST_CHECK_EQUAL(face2.sentData.size(), 0); // Update in first producer as well so consumer on sync data // callback checks still pass publishUpdateFor("testUser-2"); replicatedProducer.publishName("testUser-2"); advanceClocks(ndn::time::milliseconds(15), 100); BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); BOOST_CHECK_EQUAL(face2.sentData.size(), 1); } BOOST_AUTO_TEST_CASE(ApplicationNack) { // 50 is more than expected number of entries of 40 in the producer's IBF addUserNodes("testUser", 50); std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); publishUpdateFor("testUser-2"); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); oldSeqMap = producer->m_prefixes; for (int i = 0; i < 50; i++) { Name prefix("testUser-" + to_string(i)); producer->updateSeqNo(prefix, producer->getSeqNo(prefix).value() + 1); } // Next sync interest should trigger the nack advanceClocks(ndn::time::milliseconds(15), 100); // Application should have been notified that new data is available // from the hello itself. BOOST_CHECK_EQUAL(numSyncDataRcvd, 2); bool nackRcvd = false; for (const auto& data : face.sentData) { if (data.getContentType() == tlv::ContentType_Nack) { nackRcvd = true; break; } } BOOST_CHECK(nackRcvd); publishUpdateFor("testUser-4"); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numSyncDataRcvd, 3); } BOOST_AUTO_TEST_CASE(SegmentedHello) { std::vector subscribeTo{"testUser-2", "testUser-4", "testUser-6"}; addConsumer(0, subscribeTo); addUserNodes("testUser", 400); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); // Simulate sending delayed interest for second segment BOOST_REQUIRE(!face.sentData.empty()); Name dataName = face.sentData.back().getName(); face.sentData.clear(); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 2); advanceClocks(ndn::time::milliseconds(1000)); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 0); producer->onHelloInterest(consumers[0]->m_helloInterestPrefix, Interest(dataName)); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 2); BOOST_REQUIRE(!face.sentData.empty()); BOOST_CHECK_EQUAL(face.sentData.front().getName().at(-1).toSegment(), 1); } BOOST_AUTO_TEST_CASE(SegmentedSync) { Name longNameToExceedDataSize; for (int i = 0; i < 100; i++) { longNameToExceedDataSize.append("test-" + std::to_string(i)); } addUserNodes(longNameToExceedDataSize.toUri(), 10); std::vector subscribeTo; for (int i = 1; i < 10; i++) { subscribeTo.push_back(longNameToExceedDataSize.toUri() + "-" + to_string(i)); } addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); // To be used later to simulate sending delayed segmented interest Name syncInterestName(consumers[0]->m_syncInterestPrefix); consumers[0]->m_bloomFilter.appendToName(syncInterestName); syncInterestName.append(consumers[0]->m_iblt); syncInterestName.appendVersion(); syncInterestName.appendSegment(1); oldSeqMap = producer->m_prefixes; for (int i = 1; i < 10; i++) { producer->updateSeqNo(longNameToExceedDataSize.toUri() + "-" + to_string(i), 1); } advanceClocks(ndn::time::milliseconds(999)); BOOST_CHECK_EQUAL(numSyncDataRcvd, 0); advanceClocks(ndn::time::milliseconds(1)); BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); // Simulate sending delayed interest for second segment face.sentData.clear(); consumerFaces[0]->sentData.clear(); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 2); advanceClocks(ndn::time::milliseconds(2000)); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 0); producer->onSyncInterest(consumers[0]->m_syncInterestPrefix, Interest(syncInterestName)); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(producer->m_segmentPublisher.m_ims.size(), 2); BOOST_REQUIRE(!face.sentData.empty()); BOOST_CHECK_EQUAL(face.sentData.front().getName().at(-1).toSegment(), 1); } BOOST_AUTO_TEST_CASE(DelayedSubscription) // #5122 { publishUpdateFor("testUser-2"); std::vector subscribeTo{"testUser-2", "testUser-4"}; addConsumer(0, subscribeTo); consumers[0]->sendHelloInterest(); advanceClocks(ndn::time::milliseconds(10)); BOOST_CHECK_EQUAL(numHelloDataRcvd, 1); // Application came up late and subscribed to testUser-2 // after Producer had already published the first update. // So by default Consumer will let the application know that // the prefix it subscribed to has already some updates BOOST_CHECK_EQUAL(numSyncDataRcvd, 1); } BOOST_AUTO_TEST_SUITE_END() } // namespace psync