**Breaking change** Use bzip2 compression of sync data payload
Change-Id: I0a322e3268a5adc9d221c23c43fc6899c9dbf836 Refs: #4140
This commit is contained in:
@@ -0,0 +1,59 @@
|
||||
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
|
||||
/*
|
||||
* Copyright (c) 2012-2018 University of California, Los Angeles
|
||||
*
|
||||
* This file is part of ChronoSync, synchronization library for distributed realtime
|
||||
* applications for NDN.
|
||||
*
|
||||
* ChronoSync is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU General Public License as published by the Free Software Foundation, either
|
||||
* version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "bzip2-helper.hpp"
|
||||
|
||||
#include <boost/iostreams/filtering_stream.hpp>
|
||||
#include <boost/iostreams/detail/iostream.hpp>
|
||||
#include <boost/iostreams/filter/bzip2.hpp>
|
||||
#include <boost/iostreams/copy.hpp>
|
||||
|
||||
#include <ndn-cxx/encoding/buffer-stream.hpp>
|
||||
|
||||
namespace chronosync {
|
||||
namespace bzip2 {
|
||||
|
||||
namespace bio = boost::iostreams;
|
||||
|
||||
std::shared_ptr<ndn::Buffer>
|
||||
compress(const char* buffer, size_t bufferSize)
|
||||
{
|
||||
ndn::OBufferStream os;
|
||||
bio::filtering_stream<bio::output> out;
|
||||
out.push(bio::bzip2_compressor());
|
||||
out.push(os);
|
||||
bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
|
||||
bio::copy(in, out);
|
||||
return os.buf();
|
||||
}
|
||||
|
||||
std::shared_ptr<ndn::Buffer>
|
||||
decompress(const char* buffer, size_t bufferSize)
|
||||
{
|
||||
ndn::OBufferStream os;
|
||||
bio::filtering_stream<bio::output> out;
|
||||
out.push(bio::bzip2_decompressor());
|
||||
out.push(os);
|
||||
bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
|
||||
bio::copy(in, out);
|
||||
return os.buf();
|
||||
}
|
||||
|
||||
} // namespace bzip2
|
||||
} // namespace chronosync
|
||||
@@ -0,0 +1,43 @@
|
||||
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
|
||||
/*
|
||||
* Copyright (c) 2012-2018 University of California, Los Angeles
|
||||
*
|
||||
* This file is part of ChronoSync, synchronization library for distributed realtime
|
||||
* applications for NDN.
|
||||
*
|
||||
* ChronoSync is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU General Public License as published by the Free Software Foundation, either
|
||||
* version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef CHRONOSYNC_BZIP2_HELPER_HPP
|
||||
#define CHRONOSYNC_BZIP2_HELPER_HPP
|
||||
|
||||
#include <ndn-cxx/encoding/buffer.hpp>
|
||||
|
||||
namespace chronosync {
|
||||
namespace bzip2 {
|
||||
|
||||
/**
|
||||
* @brief Compress @p buffer of size @p bufferSize with bzip2
|
||||
*/
|
||||
std::shared_ptr<ndn::Buffer>
|
||||
compress(const char* buffer, size_t bufferSize);
|
||||
|
||||
/**
|
||||
* @brief Decompress buffer @p buffer of size @p bufferSize with bzip2
|
||||
*/
|
||||
std::shared_ptr<ndn::Buffer>
|
||||
decompress(const char* buffer, size_t bufferSize);
|
||||
|
||||
} // namespace bzip2
|
||||
} // namespace chronosync
|
||||
|
||||
#endif // CHRONOSYNC_BZIP2_HELPER_HPP
|
||||
+46
-30
@@ -25,6 +25,7 @@
|
||||
|
||||
#include "logic.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "bzip2-helper.hpp"
|
||||
|
||||
#include <ndn-cxx/util/backports.hpp>
|
||||
#include <ndn-cxx/util/string-helper.hpp>
|
||||
@@ -413,7 +414,9 @@ Logic::onSyncDataValidated(const Data& data, bool firstData)
|
||||
Name name = data.getName();
|
||||
ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
|
||||
|
||||
processSyncData(name, digest, data.getContent().blockFromValue(), firstData);
|
||||
auto contentBuffer = bzip2::decompress(reinterpret_cast<const char*>(data.getContent().value()),
|
||||
data.getContent().value_size());
|
||||
processSyncData(name, digest, Block(contentBuffer), firstData);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -669,10 +672,10 @@ Logic::sendSyncInterest()
|
||||
}
|
||||
|
||||
void
|
||||
Logic::trimState(State& partialState, const State& state, size_t maxSize)
|
||||
Logic::trimState(State& partialState, const State& state, size_t nExcludedStates)
|
||||
{
|
||||
partialState.reset();
|
||||
State tmp;
|
||||
|
||||
std::vector<ConstLeafPtr> leaves;
|
||||
for (const ConstLeafPtr& leaf : state.getLeaves()) {
|
||||
leaves.push_back(leaf);
|
||||
@@ -680,15 +683,52 @@ Logic::trimState(State& partialState, const State& state, size_t maxSize)
|
||||
|
||||
std::shuffle(leaves.begin(), leaves.end(), m_rng);
|
||||
|
||||
size_t statesToEncode = leaves.size() - std::min(leaves.size() - 1, nExcludedStates);
|
||||
for (const auto& constLeafPtr : leaves) {
|
||||
tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
|
||||
if (tmp.wireEncode().size() >= maxSize) {
|
||||
if (statesToEncode == 0) {
|
||||
break;
|
||||
}
|
||||
partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
|
||||
--statesToEncode;
|
||||
}
|
||||
}
|
||||
|
||||
Data
|
||||
Logic::encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state)
|
||||
{
|
||||
Data syncReply(name);
|
||||
syncReply.setFreshnessPeriod(m_syncReplyFreshness);
|
||||
|
||||
auto finalizeReply = [this, &nodePrefix, &syncReply] (const State& state) {
|
||||
auto contentBuffer = bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
|
||||
state.wireEncode().size());
|
||||
syncReply.setContent(contentBuffer);
|
||||
|
||||
if (m_nodeList[nodePrefix].signingId.empty())
|
||||
m_keyChain.sign(syncReply);
|
||||
else
|
||||
m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
|
||||
};
|
||||
|
||||
finalizeReply(state);
|
||||
|
||||
size_t nExcludedStates = 1;
|
||||
while (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
|
||||
if (nExcludedStates == 1) {
|
||||
// To show this debug message only once
|
||||
_LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << (getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) << ")");
|
||||
}
|
||||
State partialState;
|
||||
trimState(partialState, state, nExcludedStates);
|
||||
finalizeReply(partialState);
|
||||
|
||||
BOOST_ASSERT(state.getLeaves().size() != 0);
|
||||
nExcludedStates *= 2;
|
||||
}
|
||||
|
||||
return syncReply;
|
||||
}
|
||||
|
||||
void
|
||||
Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
|
||||
{
|
||||
@@ -696,31 +736,7 @@ Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state
|
||||
if (m_nodeList.find(nodePrefix) == m_nodeList.end())
|
||||
return;
|
||||
|
||||
Data syncReply(name);
|
||||
syncReply.setContent(state.wireEncode());
|
||||
syncReply.setFreshnessPeriod(m_syncReplyFreshness);
|
||||
|
||||
if (m_nodeList[nodePrefix].signingId.empty())
|
||||
m_keyChain.sign(syncReply);
|
||||
else
|
||||
m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
|
||||
|
||||
if (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
|
||||
_LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << getMaxPacketLimit() << ")");
|
||||
auto maxContentSize = getMaxPacketLimit() - (syncReply.wireEncode().size() - syncReply.getContent().size());
|
||||
maxContentSize -= NDNLP_EXPECTED_OVERHEAD;
|
||||
|
||||
State partialState;
|
||||
trimState(partialState, state, maxContentSize);
|
||||
syncReply.setContent(partialState.wireEncode());
|
||||
|
||||
if (m_nodeList[nodePrefix].signingId.empty())
|
||||
m_keyChain.sign(syncReply);
|
||||
else
|
||||
m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
|
||||
}
|
||||
|
||||
m_face.put(syncReply);
|
||||
m_face.put(encodeSyncReply(nodePrefix, name, state));
|
||||
|
||||
// checking if our own interest got satisfied
|
||||
if (m_outstandingInterestName == name) {
|
||||
|
||||
+5
-2
@@ -230,9 +230,12 @@ CHRONOSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
|
||||
return m_state;
|
||||
}
|
||||
|
||||
/// @brief Trim @p state to a subset @p partialState whose encoding does not exceed @p maxSize
|
||||
/// Create a subset @p partialState excluding @p nExcludedStates from @p state
|
||||
void
|
||||
trimState(State& partialState, const State& state, size_t maxSize);
|
||||
trimState(State& partialState, const State& state, size_t excludedStates);
|
||||
|
||||
Data
|
||||
encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state);
|
||||
|
||||
private:
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
|
||||
/*
|
||||
* Copyright (c) 2012-2018 University of California, Los Angeles
|
||||
*
|
||||
* This file is part of ChronoSync, synchronization library for distributed realtime
|
||||
* applications for NDN.
|
||||
*
|
||||
* ChronoSync is free software: you can redistribute it and/or modify it under the terms
|
||||
* of the GNU General Public License as published by the Free Software Foundation, either
|
||||
* version 3 of the License, or (at your option) any later version.
|
||||
*
|
||||
* ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
|
||||
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
|
||||
* PURPOSE. See the GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along with
|
||||
* ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "bzip2-helper.hpp"
|
||||
|
||||
#include "boost-test.hpp"
|
||||
|
||||
namespace chronosync {
|
||||
namespace test {
|
||||
|
||||
using std::tuple;
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(TestBzip2Helper)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(Basic)
|
||||
{
|
||||
std::string message = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
|
||||
"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
|
||||
"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
|
||||
"nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
|
||||
"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. "
|
||||
"Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt "
|
||||
"mollit anim id est laborum.";
|
||||
|
||||
auto compressed = bzip2::compress(message.data(), message.size());
|
||||
BOOST_CHECK_LT(compressed->size(), message.size());
|
||||
|
||||
auto decompressed = bzip2::decompress(reinterpret_cast<const char*>(compressed->data()), compressed->size());
|
||||
BOOST_CHECK_EQUAL(message.size(), decompressed->size());
|
||||
BOOST_CHECK_EQUAL(message, std::string(reinterpret_cast<const char*>(decompressed->data()), decompressed->size()));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
} // namespace test
|
||||
} // namespace chronosync
|
||||
@@ -18,12 +18,15 @@
|
||||
*/
|
||||
|
||||
#include "logic.hpp"
|
||||
#include "bzip2-helper.hpp"
|
||||
|
||||
#include "boost-test.hpp"
|
||||
#include "../identity-management-fixture.hpp"
|
||||
|
||||
#include "dummy-forwarder.hpp"
|
||||
|
||||
#include <ndn-cxx/util/random.hpp>
|
||||
|
||||
namespace chronosync {
|
||||
namespace test {
|
||||
|
||||
@@ -319,7 +322,7 @@ BOOST_AUTO_TEST_CASE(MultipleUserUnderOneLogic)
|
||||
BOOST_CHECK_EQUAL(handler[1]->logic.getSessionNames().size(), 2);
|
||||
}
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(ExplodeData, ndn::tests::IdentityManagementTimeFixture)
|
||||
BOOST_FIXTURE_TEST_CASE(TrimState, ndn::tests::IdentityManagementTimeFixture)
|
||||
{
|
||||
Name syncPrefix("/ndn/broadcast/sync");
|
||||
Name userPrefix("/user");
|
||||
@@ -327,25 +330,47 @@ BOOST_FIXTURE_TEST_CASE(ExplodeData, ndn::tests::IdentityManagementTimeFixture)
|
||||
Logic logic(face, syncPrefix, userPrefix, bind(onUpdate, _1));
|
||||
|
||||
State state;
|
||||
int i = 0;
|
||||
while (state.wireEncode().size() < ndn::MAX_NDN_PACKET_SIZE) {
|
||||
Name name("/test1");
|
||||
name.append(std::to_string(i));
|
||||
state.update(name, i++);
|
||||
for (size_t i = 0; i != 100; ++i) {
|
||||
state.update(Name("/to/trim").appendNumber(i), 42);
|
||||
}
|
||||
|
||||
Data syncReply(syncPrefix);
|
||||
syncReply.setContent(state.wireEncode());
|
||||
m_keyChain.sign(syncReply);
|
||||
State partial;
|
||||
logic.trimState(partial, state, 1);
|
||||
BOOST_CHECK_EQUAL(partial.getLeaves().size(), 99);
|
||||
|
||||
BOOST_REQUIRE(syncReply.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE);
|
||||
logic.trimState(partial, state, 100);
|
||||
BOOST_CHECK_EQUAL(partial.getLeaves().size(), 1);
|
||||
|
||||
State partialState;
|
||||
auto maxSize = ndn::MAX_NDN_PACKET_SIZE - (syncReply.wireEncode().size() - state.wireEncode().size());
|
||||
logic.trimState(partialState, state, maxSize);
|
||||
logic.trimState(partial, state, 101);
|
||||
BOOST_CHECK_EQUAL(partial.getLeaves().size(), 1);
|
||||
|
||||
syncReply.setContent(partialState.wireEncode());
|
||||
BOOST_REQUIRE(syncReply.wireEncode().size() < ndn::MAX_NDN_PACKET_SIZE);
|
||||
logic.trimState(partial, state, 42);
|
||||
BOOST_CHECK_EQUAL(partial.getLeaves().size(), 58);
|
||||
}
|
||||
|
||||
BOOST_FIXTURE_TEST_CASE(VeryLargeState, ndn::tests::IdentityManagementTimeFixture)
|
||||
{
|
||||
addIdentity("/bla");
|
||||
Name syncPrefix("/ndn/broadcast/sync");
|
||||
Name userPrefix("/user");
|
||||
ndn::util::DummyClientFace face;
|
||||
Logic logic(face, syncPrefix, userPrefix, bind(onUpdate, _1));
|
||||
|
||||
State state;
|
||||
for (size_t i = 0; i < 50000 && bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
|
||||
state.wireEncode().size())->size() < ndn::MAX_NDN_PACKET_SIZE;
|
||||
i += 10) {
|
||||
Name prefix("/to/trim");
|
||||
prefix.appendNumber(i);
|
||||
for (size_t j = 0; j != 20; ++j) {
|
||||
prefix.appendNumber(ndn::random::generateWord32());
|
||||
}
|
||||
state.update(prefix, ndn::random::generateWord32());
|
||||
}
|
||||
BOOST_TEST_MESSAGE("Got state with " << state.getLeaves().size() << " leaves");
|
||||
|
||||
auto data = logic.encodeSyncReply(userPrefix, "/fake/prefix/of/interest", state);
|
||||
BOOST_CHECK_LE(data.wireEncode().size(), ndn::MAX_NDN_PACKET_SIZE);
|
||||
}
|
||||
|
||||
class MaxPacketCustomizationFixture
|
||||
@@ -361,6 +386,7 @@ public:
|
||||
|
||||
~MaxPacketCustomizationFixture()
|
||||
{
|
||||
unsetenv("CHRONOSYNC_MAX_PACKET_SIZE");
|
||||
if (oldSize) {
|
||||
setenv("CHRONOSYNC_MAX_PACKET_SIZE", oldSize->c_str(), 1);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user