diff --git a/daemon/face/internal-transport.cpp b/daemon/face/internal-transport.cpp index 3e9552c0..5b3fcac3 100644 --- a/daemon/face/internal-transport.cpp +++ b/daemon/face/internal-transport.cpp @@ -46,21 +46,21 @@ InternalForwarderTransport::InternalForwarderTransport(const FaceUri& localUri, } void -InternalForwarderTransport::receiveFromLink(const Block& packet) +InternalForwarderTransport::receivePacket(Block&& packet) { - NFD_LOG_FACE_TRACE(__func__); - - Packet p; - p.packet = packet; - this->receive(std::move(p)); + getGlobalIoService().post([this, pkt = std::move(packet)] () mutable { + NFD_LOG_FACE_TRACE("Received: " << pkt.size() << " bytes"); + receive(Packet{std::move(pkt)}); + }); } void InternalForwarderTransport::doSend(Packet&& packet) { - NFD_LOG_FACE_TRACE(__func__); + NFD_LOG_FACE_TRACE("Sending to " << m_peer); - this->emitSignal(afterSend, packet.packet); + if (m_peer) + m_peer->receivePacket(std::move(packet.packet)); } void @@ -68,50 +68,59 @@ InternalForwarderTransport::doClose() { NFD_LOG_FACE_TRACE(__func__); - this->setState(TransportState::CLOSED); + setState(TransportState::CLOSED); } -static void -asyncReceive(InternalTransportBase* recipient, const Block& packet) +InternalClientTransport::~InternalClientTransport() { - getGlobalIoService().post([packet, recipient] { - recipient->receiveFromLink(packet); - }); + if (m_forwarder != nullptr) { + m_forwarder->setPeer(nullptr); + } } void -InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarderTransport) +InternalClientTransport::connectToForwarder(InternalForwarderTransport* forwarder) { - NFD_LOG_DEBUG(__func__ << " " << forwarderTransport); + NFD_LOG_DEBUG(__func__ << " " << forwarder); - m_fwToClientTransmitConn.disconnect(); - m_clientToFwTransmitConn.disconnect(); - m_fwTransportStateConn.disconnect(); + if (m_forwarder != nullptr) { + // disconnect from the old forwarder transport + m_forwarder->setPeer(nullptr); + m_fwTransportStateConn.disconnect(); + } - if (forwarderTransport != nullptr) { - m_fwToClientTransmitConn = forwarderTransport->afterSend.connect(bind(&asyncReceive, this, _1)); - m_clientToFwTransmitConn = this->afterSend.connect(bind(&asyncReceive, forwarderTransport, _1)); - m_fwTransportStateConn = forwarderTransport->afterStateChange.connect( + m_forwarder = forwarder; + + if (m_forwarder != nullptr) { + // connect to the new forwarder transport + m_forwarder->setPeer(this); + m_fwTransportStateConn = m_forwarder->afterStateChange.connect( [this] (TransportState oldState, TransportState newState) { if (newState == TransportState::CLOSED) { - this->connectToForwarder(nullptr); + connectToForwarder(nullptr); } }); } } void -InternalClientTransport::receiveFromLink(const Block& packet) +InternalClientTransport::receivePacket(Block&& packet) { - if (m_receiveCallback) { - m_receiveCallback(packet); - } + getGlobalIoService().post([this, pkt = std::move(packet)] { + NFD_LOG_TRACE("Received: " << pkt.size() << " bytes"); + if (m_receiveCallback) { + m_receiveCallback(pkt); + } + }); } void InternalClientTransport::send(const Block& wire) { - this->emitSignal(afterSend, wire); + NFD_LOG_TRACE("Sending to " << m_forwarder); + + if (m_forwarder) + m_forwarder->receivePacket(Block{wire}); } void @@ -121,7 +130,7 @@ InternalClientTransport::send(const Block& header, const Block& payload) encoder.appendByteArray(header.wire(), header.size()); encoder.appendByteArray(payload.wire(), payload.size()); - this->send(encoder.block()); + send(encoder.block()); } } // namespace face diff --git a/daemon/face/internal-transport.hpp b/daemon/face/internal-transport.hpp index 9f834d71..f859099c 100644 --- a/daemon/face/internal-transport.hpp +++ b/daemon/face/internal-transport.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2018, Regents of the University of California, + * Copyright (c) 2014-2019, Regents of the University of California, * Arizona Board of Regents, * Colorado State University, * University Pierre & Marie Curie, Sorbonne University, @@ -33,7 +33,7 @@ namespace nfd { namespace face { -/** \brief abstracts a transport that can be paired with another +/** \brief Abstracts a transport that can be paired with another. */ class InternalTransportBase { @@ -41,49 +41,53 @@ public: virtual ~InternalTransportBase() = default; - /** \brief causes the transport to receive a link-layer packet - */ virtual void - receiveFromLink(const Block& packet) = 0; - - signal::Signal afterSend; - -protected: - DECLARE_SIGNAL_EMIT(afterSend) + receivePacket(Block&& packet) = 0; }; -/** \brief implements a forwarder-side transport that can be paired with another +/** \brief Implements a forwarder-side transport that can be paired with another transport. */ -class InternalForwarderTransport : public Transport, public InternalTransportBase +class InternalForwarderTransport final : public Transport, public InternalTransportBase { public: + explicit InternalForwarderTransport(const FaceUri& localUri = FaceUri("internal://"), const FaceUri& remoteUri = FaceUri("internal://"), ndn::nfd::FaceScope scope = ndn::nfd::FACE_SCOPE_LOCAL, ndn::nfd::LinkType linkType = ndn::nfd::LINK_TYPE_POINT_TO_POINT); void - receiveFromLink(const Block& packet) override; + setPeer(InternalTransportBase* peer) + { + m_peer = peer; + } + + void + receivePacket(Block&& packet) final; protected: void - doClose() override; + doClose() final; private: void - doSend(Packet&& packet) override; + doSend(Packet&& packet) final; private: NFD_LOG_MEMBER_DECL(); + + InternalTransportBase* m_peer = nullptr; }; -/** \brief implements a client-side transport that can be paired with another +/** \brief Implements a client-side transport that can be paired with an InternalForwarderTransport. */ -class InternalClientTransport : public ndn::Transport, public InternalTransportBase +class InternalClientTransport final : public ndn::Transport, public InternalTransportBase { public: - /** \brief connect to a forwarder-side transport - * \param forwarderTransport the forwarder-side transport to connect to; may be nullptr + ~InternalClientTransport() final; + + /** \brief Connect to a forwarder-side transport. + * \param forwarder the forwarder-side transport to connect to; may be nullptr * * The connected forwarder-side transport will be disconnected automatically if this method * is called again, or if that transport is closed. @@ -91,37 +95,36 @@ public: * all sent packets would be lost, and nothing would be received. */ void - connectToForwarder(InternalForwarderTransport* forwarderTransport); + connectToForwarder(InternalForwarderTransport* forwarder); void - receiveFromLink(const Block& packet) override; + receivePacket(Block&& packet) final; void - close() override + send(const Block& wire) final; + + void + send(const Block& header, const Block& payload) final; + + void + close() final { } void - pause() override + pause() final { } void - resume() override + resume() final { } - void - send(const Block& wire) override; - - void - send(const Block& header, const Block& payload) override; - private: NFD_LOG_MEMBER_DECL(); - signal::ScopedConnection m_fwToClientTransmitConn; - signal::ScopedConnection m_clientToFwTransmitConn; + InternalForwarderTransport* m_forwarder = nullptr; signal::ScopedConnection m_fwTransportStateConn; }; diff --git a/tests/daemon/face/internal-face.t.cpp b/tests/daemon/face/internal-face.t.cpp index 9ea69035..ecd1d002 100644 --- a/tests/daemon/face/internal-face.t.cpp +++ b/tests/daemon/face/internal-face.t.cpp @@ -42,7 +42,7 @@ class InternalFaceFixture : public GlobalIoTimeFixture, public KeyChainFixture public: InternalFaceFixture() { - std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain);; + std::tie(forwarderFace, clientFace) = makeInternalFace(m_keyChain); forwarderFace->afterReceiveInterest.connect( [this] (const Interest& interest) { receivedInterests.push_back(interest); } ); @@ -82,7 +82,7 @@ BOOST_AUTO_TEST_CASE(TransportStaticProperties) BOOST_AUTO_TEST_CASE(ReceiveInterestTimeout) { - shared_ptr interest = makeInterest("/TLETccRv"); + auto interest = makeInterest("/TLETccRv"); interest->setInterestLifetime(100_ms); bool hasTimeout = false; @@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestTimeout) BOOST_AUTO_TEST_CASE(ReceiveInterestSendData) { - shared_ptr interest = makeInterest("/PQstEJGdL"); + auto interest = makeInterest("/PQstEJGdL"); bool hasReceivedData = false; clientFace->expressInterest(*interest, @@ -117,8 +117,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendData) BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1); BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/PQstEJGdL"); - shared_ptr data = makeData("/PQstEJGdL/aI7oCrDXNX"); - forwarderFace->sendData(*data); + forwarderFace->sendData(*makeData("/PQstEJGdL/aI7oCrDXNX")); this->advanceClocks(1_ms, 10); BOOST_CHECK(hasReceivedData); @@ -126,7 +125,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendData) BOOST_AUTO_TEST_CASE(ReceiveInterestSendNack) { - shared_ptr interest = makeInterest("/1HrsRM1X", 152); + auto interest = makeInterest("/1HrsRM1X", 152); bool hasReceivedNack = false; clientFace->expressInterest(*interest, @@ -141,8 +140,7 @@ BOOST_AUTO_TEST_CASE(ReceiveInterestSendNack) BOOST_REQUIRE_EQUAL(receivedInterests.size(), 1); BOOST_CHECK_EQUAL(receivedInterests.back().getName(), "/1HrsRM1X"); - lp::Nack nack = makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE); - forwarderFace->sendNack(nack); + forwarderFace->sendNack(makeNack("/1HrsRM1X", 152, lp::NackReason::NO_ROUTE)); this->advanceClocks(1_ms, 10); BOOST_CHECK(hasReceivedNack); @@ -156,12 +154,10 @@ BOOST_AUTO_TEST_CASE(SendInterestReceiveData) hasDeliveredInterest = true; BOOST_CHECK_EQUAL(interest.getName(), "/Wpc8TnEeoF/f6SzV8hD"); - shared_ptr data = makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi"); - clientFace->put(*data); + clientFace->put(*makeData("/Wpc8TnEeoF/f6SzV8hD/3uytUJCuIi")); }); - shared_ptr interest = makeInterest("/Wpc8TnEeoF/f6SzV8hD"); - forwarderFace->sendInterest(*interest); + forwarderFace->sendInterest(*makeInterest("/Wpc8TnEeoF/f6SzV8hD")); this->advanceClocks(1_ms, 10); BOOST_CHECK(hasDeliveredInterest); @@ -177,12 +173,10 @@ BOOST_AUTO_TEST_CASE(SendInterestReceiveNack) hasDeliveredInterest = true; BOOST_CHECK_EQUAL(interest.getName(), "/4YgJKWcXN/5oaTe05o"); - lp::Nack nack = makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE); - clientFace->put(nack); + clientFace->put(makeNack("/4YgJKWcXN/5oaTe05o", 191, lp::NackReason::NO_ROUTE)); }); - shared_ptr interest = makeInterest("/4YgJKWcXN/5oaTe05o", 191); - forwarderFace->sendInterest(*interest); + forwarderFace->sendInterest(*makeInterest("/4YgJKWcXN/5oaTe05o", 191)); this->advanceClocks(1_ms, 10); BOOST_CHECK(hasDeliveredInterest); @@ -197,7 +191,7 @@ BOOST_AUTO_TEST_CASE(CloseForwarderFace) BOOST_CHECK_EQUAL(forwarderFace->getState(), FaceState::CLOSED); forwarderFace.reset(); - shared_ptr interest = makeInterest("/zpHsVesu0B"); + auto interest = makeInterest("/zpHsVesu0B"); interest->setInterestLifetime(100_ms); bool hasTimeout = false; @@ -216,8 +210,7 @@ BOOST_AUTO_TEST_CASE(CloseClientFace) g_io.poll(); // #3248 workaround clientFace.reset(); - shared_ptr interest = makeInterest("/aau42XQqb"); - forwarderFace->sendInterest(*interest); + forwarderFace->sendInterest(*makeInterest("/aau42XQqb")); BOOST_CHECK_NO_THROW(this->advanceClocks(1_ms, 10)); } diff --git a/tests/daemon/fw/topology-tester.cpp b/tests/daemon/fw/topology-tester.cpp index c401aefe..0e19b57b 100644 --- a/tests/daemon/fw/topology-tester.cpp +++ b/tests/daemon/fw/topology-tester.cpp @@ -24,6 +24,7 @@ */ #include "topology-tester.hpp" + #include "daemon/global.hpp" #include "face/generic-link-service.hpp" @@ -33,13 +34,19 @@ namespace nfd { namespace fw { namespace tests { -using face::InternalTransportBase; -using face::InternalForwarderTransport; -using face::InternalClientTransport; using face::GenericLinkService; +using face::InternalClientTransport; +using face::InternalForwarderTransport; + +TopologyLink::NodeTransport::NodeTransport(shared_ptr f, ReceiveProxy::Callback cb) + : face(std::move(f)) + , transport(dynamic_cast(face->getTransport())) + , proxy(std::move(cb)) +{ + BOOST_ASSERT(transport != nullptr); +} TopologyLink::TopologyLink(time::nanoseconds delay) - : m_isUp(true) { this->setDelay(delay); } @@ -66,19 +73,19 @@ TopologyLink::setDelay(time::nanoseconds delay) void TopologyLink::addFace(TopologyNode i, shared_ptr face) { - BOOST_ASSERT(m_transports.count(i) == 0); - auto& nodeTransport = m_transports[i]; + auto receiveCb = [this, i] (Block&& pkt) { transmit(i, std::move(pkt)); }; - nodeTransport.face = face; + auto ret = m_transports.emplace(std::piecewise_construct, + std::forward_as_tuple(i), + std::forward_as_tuple(std::move(face), std::move(receiveCb))); + BOOST_ASSERT(ret.second); - nodeTransport.transport = dynamic_cast(face->getTransport()); - BOOST_ASSERT(nodeTransport.transport != nullptr); - nodeTransport.transport->afterSend.connect( - [this, i] (const Block& packet) { this->transmit(i, packet); }); + auto& node = ret.first->second; + node.transport->setPeer(&node.proxy); } void -TopologyLink::transmit(TopologyNode i, const Block& packet) +TopologyLink::transmit(TopologyNode i, Block&& packet) { if (!m_isUp) { return; @@ -91,22 +98,21 @@ TopologyLink::transmit(TopologyNode i, const Block& packet) continue; } - InternalTransportBase* recipient = p.second.transport; - this->scheduleReceive(recipient, packet); + this->scheduleReceive(p.second.transport, Block{packet}); } } void -TopologyLink::scheduleReceive(InternalTransportBase* recipient, const Block& packet) +TopologyLink::scheduleReceive(face::InternalTransportBase* recipient, Block&& packet) { - getScheduler().schedule(m_delay, [packet, recipient] { - recipient->receiveFromLink(packet); + getScheduler().schedule(m_delay, [=, pkt = std::move(packet)] () mutable { + recipient->receivePacket(std::move(pkt)); }); } TopologyAppLink::TopologyAppLink(shared_ptr forwarderFace) - : m_face(forwarderFace) - , m_forwarderTransport(static_cast(forwarderFace->getTransport())) + : m_face(std::move(forwarderFace)) + , m_forwarderTransport(static_cast(m_face->getTransport())) , m_clientTransport(make_shared()) , m_client(make_shared(m_clientTransport, getGlobalIoService())) { @@ -190,7 +196,7 @@ TopologyTester::addLink(const std::string& label, time::nanoseconds delay, auto face = make_shared(std::move(service), std::move(transport)); forwarder.addFace(face); - link->addFace(i, face); + link->addFace(i, std::move(face)); } m_links.push_back(link); // keep a shared_ptr so callers don't have to @@ -212,7 +218,7 @@ TopologyTester::addAppFace(const std::string& label, TopologyNode i) forwarder.addFace(face); - auto al = make_shared(face); + auto al = make_shared(std::move(face)); m_appLinks.push_back(al); // keep a shared_ptr so callers don't have to return al; } diff --git a/tests/daemon/fw/topology-tester.hpp b/tests/daemon/fw/topology-tester.hpp index cbd8fd99..2c8d7d08 100644 --- a/tests/daemon/fw/topology-tester.hpp +++ b/tests/daemon/fw/topology-tester.hpp @@ -1,6 +1,6 @@ /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */ /* - * Copyright (c) 2014-2018, Regents of the University of California, + * Copyright (c) 2014-2019, Regents of the University of California, * Arizona Board of Regents, * Colorado State University, * University Pierre & Marie Curie, Sorbonne University, @@ -108,21 +108,48 @@ public: private: void - transmit(TopologyNode i, const Block& packet); + transmit(TopologyNode i, Block&& packet); void - scheduleReceive(face::InternalTransportBase* recipient, const Block& packet); + scheduleReceive(face::InternalTransportBase* recipient, Block&& packet); private: - bool m_isUp; + bool m_isUp = true; time::nanoseconds m_delay; - struct NodeTransport + class ReceiveProxy : public face::InternalTransportBase { - face::InternalTransportBase* transport; + public: + using Callback = std::function; + + explicit + ReceiveProxy(Callback cb) + : m_cb(std::move(cb)) + { + } + + void + receivePacket(Block&& packet) final + { + m_cb(std::move(packet)); + } + + private: + Callback m_cb; + }; + + class NodeTransport + { + public: + NodeTransport(shared_ptr face, ReceiveProxy::Callback receiveCallback); + + public: shared_ptr face; + face::InternalForwarderTransport* transport; + ReceiveProxy proxy; std::set blockedDestinations; }; + std::unordered_map m_transports; };