face: simplify and optimize Internal{Forwarder,Client}Transport

Refs: #4528
Change-Id: Ie3246382965640e0d2cb71116b6526e68925887c
This commit is contained in:
Davide Pesavento
2019-03-31 02:10:02 -04:00
parent 16916ae20a
commit 284bd626fd
5 changed files with 146 additions and 108 deletions
+39 -30
View File
@@ -46,21 +46,21 @@ InternalForwarderTransport::InternalForwarderTransport(const FaceUri& localUri,
}
void
InternalForwarderTransport::receiveFromLink(const Block& packet)
InternalForwarderTransport::receivePacket(Block&& packet)
{
NFD_LOG_FACE_TRACE(__func__);
Packet p;
p.packet = packet;
this->receive(std::move(p));
getGlobalIoService().post([this, pkt = std::move(packet)] () mutable {
NFD_LOG_FACE_TRACE("Received: " << pkt.size() << " bytes");
receive(Packet{std::move(pkt)});
});
}
void
InternalForwarderTransport::doSend(Packet&& packet)
{
NFD_LOG_FACE_TRACE(__func__);
NFD_LOG_FACE_TRACE("Sending to " << m_peer);
this->emitSignal(afterSend, packet.packet);
if (m_peer)
m_peer->receivePacket(std::move(packet.packet));
}
void
@@ -68,50 +68,59 @@ InternalForwarderTransport::doClose()
{
NFD_LOG_FACE_TRACE(__func__);
this->setState(TransportState::CLOSED);
setState(TransportState::CLOSED);
}
static void
asyncReceive(InternalTransportBase* recipient, const Block& packet)
InternalClientTransport::~InternalClientTransport()
{
getGlobalIoService().post([packet, recipient] {
recipient->receiveFromLink(packet);
});
if (m_forwarder != nullptr) {
m_forwarder->setPeer(nullptr);
}
}
void
InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarderTransport)
InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarder)
{
NFD_LOG_DEBUG(__func__ << " " << forwarderTransport);
NFD_LOG_DEBUG(__func__ << " " << forwarder);
m_fwToClientTransmitConn.disconnect();
m_clientToFwTransmitConn.disconnect();
m_fwTransportStateConn.disconnect();
if (m_forwarder != nullptr) {
// disconnect from the old forwarder transport
m_forwarder->setPeer(nullptr);
m_fwTransportStateConn.disconnect();
}
if (forwarderTransport != nullptr) {
m_fwToClientTransmitConn = forwarderTransport->afterSend.connect(bind(&asyncReceive, this, _1));
m_clientToFwTransmitConn = this->afterSend.connect(bind(&asyncReceive, forwarderTransport, _1));
m_fwTransportStateConn = forwarderTransport->afterStateChange.connect(
m_forwarder = forwarder;
if (m_forwarder != nullptr) {
// connect to the new forwarder transport
m_forwarder->setPeer(this);
m_fwTransportStateConn = m_forwarder->afterStateChange.connect(
[this] (TransportState oldState, TransportState newState) {
if (newState == TransportState::CLOSED) {
this->connectToForwarder(nullptr);
connectToForwarder(nullptr);
}
});
}
}
void
InternalClientTransport::receiveFromLink(const Block& packet)
InternalClientTransport::receivePacket(Block&& packet)
{
if (m_receiveCallback) {
m_receiveCallback(packet);
}
getGlobalIoService().post([this, pkt = std::move(packet)] {
NFD_LOG_TRACE("Received: " << pkt.size() << " bytes");
if (m_receiveCallback) {
m_receiveCallback(pkt);
}
});
}
void
InternalClientTransport::send(const Block& wire)
{
this->emitSignal(afterSend, wire);
NFD_LOG_TRACE("Sending to " << m_forwarder);
if (m_forwarder)
m_forwarder->receivePacket(Block{wire});
}
void
@@ -121,7 +130,7 @@ InternalClientTransport::send(const Block& header, const Block& payload)
encoder.appendByteArray(header.wire(), header.size());
encoder.appendByteArray(payload.wire(), payload.size());
this->send(encoder.block());
send(encoder.block());
}
} // namespace face
+35 -32
View File
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2018, Regents of the University of California,
* Copyright (c) 2014-2019, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -33,7 +33,7 @@
namespace nfd {
namespace face {
/** \brief abstracts a transport that can be paired with another
/** \brief Abstracts a transport that can be paired with another.
*/
class InternalTransportBase
{
@@ -41,49 +41,53 @@ public:
virtual
~InternalTransportBase() = default;
/** \brief causes the transport to receive a link-layer packet
*/
virtual void
receiveFromLink(const Block& packet) = 0;
signal::Signal<InternalTransportBase, Block> afterSend;
protected:
DECLARE_SIGNAL_EMIT(afterSend)
receivePacket(Block&& packet) = 0;
};
/** \brief implements a forwarder-side transport that can be paired with another
/** \brief Implements a forwarder-side transport that can be paired with another transport.
*/
class InternalForwarderTransport : public Transport, public InternalTransportBase
class InternalForwarderTransport final : public Transport, public InternalTransportBase
{
public:
explicit
InternalForwarderTransport(const FaceUri& localUri = FaceUri("internal://"),
const FaceUri& remoteUri = FaceUri("internal://"),
ndn::nfd::FaceScope scope = ndn::nfd::FACE_SCOPE_LOCAL,
ndn::nfd::LinkType linkType = ndn::nfd::LINK_TYPE_POINT_TO_POINT);
void
receiveFromLink(const Block& packet) override;
setPeer(InternalTransportBase* peer)
{
m_peer = peer;
}
void
receivePacket(Block&& packet) final;
protected:
void
doClose() override;
doClose() final;
private:
void
doSend(Packet&& packet) override;
doSend(Packet&& packet) final;
private:
NFD_LOG_MEMBER_DECL();
InternalTransportBase* m_peer = nullptr;
};
/** \brief implements a client-side transport that can be paired with another
/** \brief Implements a client-side transport that can be paired with an InternalForwarderTransport.
*/
class InternalClientTransport : public ndn::Transport, public InternalTransportBase
class InternalClientTransport final : public ndn::Transport, public InternalTransportBase
{
public:
/** \brief connect to a forwarder-side transport
* \param forwarderTransport the forwarder-side transport to connect to; may be nullptr
~InternalClientTransport() final;
/** \brief Connect to a forwarder-side transport.
* \param forwarder the forwarder-side transport to connect to; may be nullptr
*
* The connected forwarder-side transport will be disconnected automatically if this method
* is called again, or if that transport is closed.
@@ -91,37 +95,36 @@ public:
* all sent packets would be lost, and nothing would be received.
*/
void
connectToForwarder(InternalForwarderTransport* forwarderTransport);
connectToForwarder(InternalForwarderTransport* forwarder);
void
receiveFromLink(const Block& packet) override;
receivePacket(Block&& packet) final;
void
close() override
send(const Block& wire) final;
void
send(const Block& header, const Block& payload) final;
void
close() final
{
}
void
pause() override
pause() final
{
}
void
resume() override
resume() final
{
}
void
send(const Block& wire) override;
void
send(const Block& header, const Block& payload) override;
private:
NFD_LOG_MEMBER_DECL();
signal::ScopedConnection m_fwToClientTransmitConn;
signal::ScopedConnection m_clientToFwTransmitConn;
InternalForwarderTransport* m_forwarder = nullptr;
signal::ScopedConnection m_fwTransportStateConn;
};
+12 -19
View File
@@ -42,7 +42,7 @@ class InternalFaceFixture : public GlobalIoTimeFixture, public KeyChainFixture
public:
InternalFaceFixture()
{
std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain);;
std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain);
forwarderFace->afterReceiveInterest.connect(
[this] (const Interest& interest) { receivedInterests.push_back(interest); } );
@@ -82,7 +82,7 @@ BOOST_AUTO_TEST_CASE(TransportStaticProperties)
BOOST_AUTO_TEST_CASE(ReceiveInterestTimeout)
{
shared_ptr<Interest> interest = makeInterest("/TLETccRv");
auto interest = makeInterest("/TLETccRv");
interest->setInterestLifetime(100_ms);
bool hasTimeout = false;
@@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestTimeout)
BOOST_AUTO_TEST_CASE(ReceiveInterestSendData)
{
shared_ptr<Interest> interest = makeInterest("/PQstEJGdL");
auto interest = makeInterest("/PQstEJGdL");
bool hasReceivedData = false;
clientFace->expressInterest(*interest,
@@ -117,8 +117,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendData)
BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1);
BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/PQstEJGdL");
shared_ptr<Data> data = makeData("/PQstEJGdL/aI7oCrDXNX");
forwarderFace->sendData(*data);
forwarderFace->sendData(*makeData("/PQstEJGdL/aI7oCrDXNX"));
this->advanceClocks(1_ms, 10);
BOOST_CHECK(hasReceivedData);
@@ -126,7 +125,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendData)
BOOST_AUTO_TEST_CASE(ReceiveInterestSendNack)
{
shared_ptr<Interest> interest = makeInterest("/1HrsRM1X", 152);
auto interest = makeInterest("/1HrsRM1X", 152);
bool hasReceivedNack = false;
clientFace->expressInterest(*interest,
@@ -141,8 +140,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendNack)
BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1);
BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/1HrsRM1X");
lp::Nack nack = makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE);
forwarderFace->sendNack(nack);
forwarderFace->sendNack(makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE));
this->advanceClocks(1_ms, 10);
BOOST_CHECK(hasReceivedNack);
@@ -156,12 +154,10 @@ BOOST_AUTO_TEST_CASE(SendInterestReceiveData)
hasDeliveredInterest = true;
BOOST_CHECK_EQUAL(interest.getName(), "/Wpc8TnEeoF/f6SzV8hD");
shared_ptr<Data> data = makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi");
clientFace->put(*data);
clientFace->put(*makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi"));
});
shared_ptr<Interest> interest = makeInterest("/Wpc8TnEeoF/f6SzV8hD");
forwarderFace->sendInterest(*interest);
forwarderFace->sendInterest(*makeInterest("/Wpc8TnEeoF/f6SzV8hD"));
this->advanceClocks(1_ms, 10);
BOOST_CHECK(hasDeliveredInterest);
@@ -177,12 +173,10 @@ BOOST_AUTO_TEST_CASE(SendInterestReceiveNack)
hasDeliveredInterest = true;
BOOST_CHECK_EQUAL(interest.getName(), "/4YgJKWcXN/5oaTe05o");
lp::Nack nack = makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE);
clientFace->put(nack);
clientFace->put(makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE));
});
shared_ptr<Interest> interest = makeInterest("/4YgJKWcXN/5oaTe05o", 191);
forwarderFace->sendInterest(*interest);
forwarderFace->sendInterest(*makeInterest("/4YgJKWcXN/5oaTe05o", 191));
this->advanceClocks(1_ms, 10);
BOOST_CHECK(hasDeliveredInterest);
@@ -197,7 +191,7 @@ BOOST_AUTO_TEST_CASE(CloseForwarderFace)
BOOST_CHECK_EQUAL(forwarderFace->getState(), FaceState::CLOSED);
forwarderFace.reset();
shared_ptr<Interest> interest = makeInterest("/zpHsVesu0B");
auto interest = makeInterest("/zpHsVesu0B");
interest->setInterestLifetime(100_ms);
bool hasTimeout = false;
@@ -216,8 +210,7 @@ BOOST_AUTO_TEST_CASE(CloseClientFace)
g_io.poll(); // #3248 workaround
clientFace.reset();
shared_ptr<Interest> interest = makeInterest("/aau42XQqb");
forwarderFace->sendInterest(*interest);
forwarderFace->sendInterest(*makeInterest("/aau42XQqb"));
BOOST_CHECK_NO_THROW(this->advanceClocks(1_ms, 10));
}
+27 -21
View File
@@ -24,6 +24,7 @@
*/
#include "topology-tester.hpp"
#include "daemon/global.hpp"
#include "face/generic-link-service.hpp"
@@ -33,13 +34,19 @@ namespace nfd {
namespace fw {
namespace tests {
using face::InternalTransportBase;
using face::InternalForwarderTransport;
using face::InternalClientTransport;
using face::GenericLinkService;
using face::InternalClientTransport;
using face::InternalForwarderTransport;
TopologyLink::NodeTransport::NodeTransport(shared_ptr<Face> f, ReceiveProxy::Callback cb)
: face(std::move(f))
, transport(dynamic_cast<InternalForwarderTransport*>(face->getTransport()))
, proxy(std::move(cb))
{
BOOST_ASSERT(transport != nullptr);
}
TopologyLink::TopologyLink(time::nanoseconds delay)
: m_isUp(true)
{
this->setDelay(delay);
}
@@ -66,19 +73,19 @@ TopologyLink::setDelay(time::nanoseconds delay)
void
TopologyLink::addFace(TopologyNode i, shared_ptr<Face> face)
{
BOOST_ASSERT(m_transports.count(i) == 0);
auto& nodeTransport = m_transports[i];
auto receiveCb = [this, i] (Block&& pkt) { transmit(i, std::move(pkt)); };
nodeTransport.face = face;
auto ret = m_transports.emplace(std::piecewise_construct,
std::forward_as_tuple(i),
std::forward_as_tuple(std::move(face), std::move(receiveCb)));
BOOST_ASSERT(ret.second);
nodeTransport.transport = dynamic_cast<InternalTransportBase*>(face->getTransport());
BOOST_ASSERT(nodeTransport.transport != nullptr);
nodeTransport.transport->afterSend.connect(
[this, i] (const Block& packet) { this->transmit(i, packet); });
auto& node = ret.first->second;
node.transport->setPeer(&node.proxy);
}
void
TopologyLink::transmit(TopologyNode i, const Block& packet)
TopologyLink::transmit(TopologyNode i, Block&& packet)
{
if (!m_isUp) {
return;
@@ -91,22 +98,21 @@ TopologyLink::transmit(TopologyNode i, const Block& packet)
continue;
}
InternalTransportBase* recipient = p.second.transport;
this->scheduleReceive(recipient, packet);
this->scheduleReceive(p.second.transport, Block{packet});
}
}
void
TopologyLink::scheduleReceive(InternalTransportBase* recipient, const Block& packet)
TopologyLink::scheduleReceive(face::InternalTransportBase* recipient, Block&& packet)
{
getScheduler().schedule(m_delay, [packet, recipient] {
recipient->receiveFromLink(packet);
getScheduler().schedule(m_delay, [=, pkt = std::move(packet)] () mutable {
recipient->receivePacket(std::move(pkt));
});
}
TopologyAppLink::TopologyAppLink(shared_ptr<Face> forwarderFace)
: m_face(forwarderFace)
, m_forwarderTransport(static_cast<InternalForwarderTransport*>(forwarderFace->getTransport()))
: m_face(std::move(forwarderFace))
, m_forwarderTransport(static_cast<InternalForwarderTransport*>(m_face->getTransport()))
, m_clientTransport(make_shared<InternalClientTransport>())
, m_client(make_shared<ndn::Face>(m_clientTransport, getGlobalIoService()))
{
@@ -190,7 +196,7 @@ TopologyTester::addLink(const std::string& label, time::nanoseconds delay,
auto face = make_shared<Face>(std::move(service), std::move(transport));
forwarder.addFace(face);
link->addFace(i, face);
link->addFace(i, std::move(face));
}
m_links.push_back(link); // keep a shared_ptr so callers don't have to
@@ -212,7 +218,7 @@ TopologyTester::addAppFace(const std::string& label, TopologyNode i)
forwarder.addFace(face);
auto al = make_shared<TopologyAppLink>(face);
auto al = make_shared<TopologyAppLink>(std::move(face));
m_appLinks.push_back(al); // keep a shared_ptr so callers don't have to
return al;
}
+33 -6
View File
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2018, Regents of the University of California,
* Copyright (c) 2014-2019, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -108,21 +108,48 @@ public:
private:
void
transmit(TopologyNode i, const Block& packet);
transmit(TopologyNode i, Block&& packet);
void
scheduleReceive(face::InternalTransportBase* recipient, const Block& packet);
scheduleReceive(face::InternalTransportBase* recipient, Block&& packet);
private:
bool m_isUp;
bool m_isUp = true;
time::nanoseconds m_delay;
struct NodeTransport
class ReceiveProxy : public face::InternalTransportBase
{
face::InternalTransportBase* transport;
public:
using Callback = std::function<void(Block&&)>;
explicit
ReceiveProxy(Callback cb)
: m_cb(std::move(cb))
{
}
void
receivePacket(Block&& packet) final
{
m_cb(std::move(packet));
}
private:
Callback m_cb;
};
class NodeTransport
{
public:
NodeTransport(shared_ptr<Face> face, ReceiveProxy::Callback receiveCallback);
public:
shared_ptr<Face> face;
face::InternalForwarderTransport* transport;
ReceiveProxy proxy;
std::set<TopologyNode> blockedDestinations;
};
std::unordered_map<TopologyNode, NodeTransport> m_transports;
};