8 Commits

Author SHA1 Message Date
baihe 785cfde06d fix: qsccp compile error 2024-01-21 12:01:02 +08:00
baihe 507c60422f fix: cc client compile error 2024-01-20 22:16:50 +08:00
SunnyQjm a6e53e66e4 feat(cc): Add bbr consuemr 2024-01-20 13:12:22 +08:00
SunnyQjm 9724c94e33 feat(cc): Add Pcon Consumer 2024-01-16 23:12:45 +08:00
baihe 3168ad7bd1 feat(cc): Pcon support retrans timer 2024-01-16 23:11:51 +08:00
root 9648808618 fix(cc): delay need time 2023-12-18 20:57:24 +08:00
root 1675c0aca0 feat(cc): Update 2023-12-18 16:39:41 +08:00
SunnyQjm a23e941b13 feat(cc): Add cc-producer and cc-client 2023-12-15 14:57:02 +00:00
31 changed files with 3940 additions and 0 deletions
+1
View File
@@ -68,6 +68,7 @@
#include <ndn-cxx/util/scheduler.hpp>
#include <ndn-cxx/util/signal.hpp>
#include <ndn-cxx/util/time.hpp>
#include <iostream>
namespace ndn {
+206
View File
@@ -0,0 +1,206 @@
#include "bbr-consumer.hpp"
#include <cmath>
namespace ndn
{
namespace cc
{
namespace client
{
BbrConsumer::BbrConsumer(Face &face, const Options &options)
: Consumer(face, options),
m_inFlight(0),
fixedRate(options.fixedRate),
delayStart(options.delayStart),
timingStop(options.timingStop),
delayGreedy(options.delayGreedy),
greedyRate(options.greedyRate),
dsz(options.dsz),
m_minRtt(std::numeric_limits<double>::max()),
m_maxBandwidth(0.0),
m_PacingGain(2 / std::log(2)),
m_CwndGain(2 / std::log(2)),
m_mode(STARTUP),
m_roundCount(0)
{
}
void BbrConsumer::startGreedy()
{
this->fixedRate = this->greedyRate;
}
void BbrConsumer::scheduleNextPacket()
{
if (m_stopFlag)
{
return;
}
if (this->m_firstTime)
{
this->m_firstTime = false;
if (this->timingStop > 0)
{
m_scheduler.schedule(time::milliseconds(this->timingStop), [this]
{ stop(); });
}
if (this->delayGreedy > 0 && this->greedyRate > 0)
{
m_scheduler.schedule(time::milliseconds(this->delayGreedy), [this]
{ startGreedy(); });
}
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(this->delayStart), [this]
{ sendPacket(); });
return;
}
if (this->fixedRate > 0)
{
auto waitTime = (this->dsz * 1000000000) / this->fixedRate + 1;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ sendPacket(); });
return;
}
if (!m_nextInterestEvent)
{
auto bdp = m_minRtt * m_maxBandwidth;
// std::cout << "BDP: " << bdp << ", inFlight: " << m_inFlight << ", minRtt: " << m_minRtt << ", maxBandwidth: " << m_maxBandwidth << std::endl;
if (m_mode == DRAIN && m_inFlight < bdp)
{
doGainCycle();
enterProbeBandwidthMode();
// Enter ProbeRTT mode every 10 seconds
m_scheduler.schedule(time::seconds(10), [this]
{ enterProbeRttMode(); });
}
if (m_mode == PROBE_RTT)
{
if (m_inFlight >= 4)
{
return;
}
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(250), [this]
{ sendPacket(); });
}
else
{
if (m_inFlight >= m_CwndGain * bdp)
{
return;
}
uint32_t waitTime = m_packetSize / (m_PacingGain * m_maxBandwidth) * 1000000000;
// std::cout << "waitTime: " << m_packetSize / (m_PacingGain * m_maxBandwidth) << std::endl;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ sendPacket(); });
}
}
}
void BbrConsumer::willSendInterest(uint32_t seq)
{
m_inFlight++;
Consumer::willSendInterest(seq);
}
void BbrConsumer::onTimeout(uint32_t seq)
{
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight--;
}
Consumer::onTimeout(seq);
}
void BbrConsumer::onData(const Data &data, uint32_t seq, const BbrPacketInfo &bbrInfo)
{
Consumer::onData(data, seq, bbrInfo);
if (!bbrInfo.isRetx)
{
auto now = time::steady_clock::now();
// 1. Update minimum RTT
auto rtt = m_rtt.duration_to_second_double(now - bbrInfo.sendTime);
if (m_minRtt > rtt)
{
m_minRtt = rtt;
}
// 2. Calculate the current delivery rate
auto delivered_rate = (m_delivered - bbrInfo.deliveredAtSend) / m_rtt.duration_to_second_double((m_deliveredTime - bbrInfo.deliveredTime));
// std::cout << "delivered_rate: " << delivered_rate << ", m_delivered: " << m_delivered << ", deliveredAtSend: " << bbrInfo.deliveredAtSend << ", rtt: " << m_rtt.duration_to_second_double(m_deliveredTime - bbrInfo.deliveredTime) << std::endl;
if (delivered_rate > m_maxBandwidth)
{
m_maxBandwidth = delivered_rate;
}
else if (m_mode == STARTUP)
{
m_mode = DRAIN;
m_PacingGain = std::log(2) / 2;
}
}
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight--;
}
scheduleNextPacket();
}
void BbrConsumer::doGainCycle()
{
m_roundCount++;
auto gainStage = m_roundCount % 8;
if (gainStage == 0)
{
m_PacingGain = 1.25;
}
else if (gainStage == 1)
{
m_PacingGain = 0.75;
}
else
{
m_PacingGain = 1;
}
uint32_t waitTime = m_minRtt * 1000000000;
m_doGainCycleEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ doGainCycle(); });
}
void BbrConsumer::enterProbeBandwidthMode()
{
m_mode = PROBE_BW;
}
void BbrConsumer::enterProbeRttMode()
{
m_mode = PROBE_RTT;
auto finishRttProbeDuration = m_minRtt;
if (finishRttProbeDuration > 0.2)
{
finishRttProbeDuration = 0.2;
}
uint32_t waitTime = finishRttProbeDuration * 1000000000;
// Leave PROBE_RTT mode after max(rtt, 200ms)
m_scheduler.schedule(time::nanoseconds(waitTime),
[this]
{
enterProbeBandwidthMode();
});
// Enter PROBE_RTT again after 10 seconds
m_probeRTTEvent = m_scheduler.schedule(time::seconds(10), [this]
{ enterProbeRttMode(); });
}
}
}
}
+64
View File
@@ -0,0 +1,64 @@
#ifndef BBR_CONSUMER_H
#define BBR_CONSUMER_H
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
class BbrConsumer : public Consumer
{
public:
enum BbrMode
{
STARTUP,
DRAIN,
PROBE_BW,
PROBE_RTT
};
explicit BbrConsumer(Face &face, const Options &options);
virtual void onData(const Data &data, uint32_t seq, const BbrPacketInfo &bbrInfo);
virtual void onTimeout(uint32_t seq);
virtual void willSendInterest(uint32_t seq);
protected:
virtual void scheduleNextPacket();
void startGreedy();
void enterProbeBandwidthMode();
void enterProbeRttMode();
void doGainCycle();
private:
// window
uint32_t m_inFlight;
int32_t fixedRate;
uint64_t delayStart;
int32_t timingStop;
int32_t delayGreedy;
int32_t greedyRate;
uint64_t dsz;
// bbr
double m_minRtt;
double m_maxBandwidth;
double m_PacingGain;
double m_CwndGain;
BbrMode m_mode;
uint32_t m_roundCount;
};
}
}
}
#endif // BBR_CONSUMER_H
+152
View File
@@ -0,0 +1,152 @@
#include "core/common.hpp"
#include "core/version.hpp"
#include "bbr-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
class Runner : noncopyable
{
public:
explicit Runner(const Options &options) : m_consumer(m_face, options)
{
m_consumer.afterFinish.connect([this]
{ this->cancel(); });
}
int
run()
{
try
{
m_consumer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
// m_tracer.onError(e.what());
return 2;
}
return 0;
}
private:
void
cancel()
{
m_consumer.stop();
}
private:
Face m_face;
BbrConsumer m_consumer;
};
static void
usage(const boost::program_options::options_description &options)
{
std::cout << "Usage: bbr-client [options] ndn:/name/prefix\n"
"\n"
"Ping a NDN name prefix using Interests with name ndn:/name/prefix/ping/number.\n"
"The numbers in the Interests are randomly generated unless specified.\n"
"\n"
<< options;
exit(2);
}
static int
main(int argc, char *argv[])
{
Options options;
// options.shouldAllowStaleData = false;
// options.nPings = -1;
// options.interval = time::milliseconds(getDefaultPingInterval());
// options.timeout = time::milliseconds(getDefaultPingTimeoutThreshold());
// options.startSeq = 0;
// options.shouldGenerateRandomSeq = true;
// options.shouldPrintTimestamp = false;
// std::string identifier;
namespace po = boost::program_options;
po::options_description visibleOptDesc("Options");
visibleOptDesc.add_options()("help,h", "print this message and exit")("version,V", "display version and exit");
visibleOptDesc.add_options()(
"startSeq", po::value<uint32_t>(&options.startSeq)->default_value(0), "start sequence number");
visibleOptDesc.add_options()(
"seqMax", po::value<int64_t>(&options.seqMax)->default_value(-1), "maximum sequence number");
visibleOptDesc.add_options()(
"dsz", po::value<uint32_t>(&options.dsz)->default_value(8624), "data size");
visibleOptDesc.add_options()(
"delayStart", po::value<uint32_t>(&options.delayStart)->default_value(0), "delay start time, in milliseconds");
visibleOptDesc.add_options()(
"fixedRate", po::value<int32_t>(&options.fixedRate)->default_value(-1), "fixed rate, in milliseconds");
visibleOptDesc.add_options()(
"timingStop", po::value<int32_t>(&options.timingStop)->default_value(-1), "timing stop, in milliseconds");
visibleOptDesc.add_options()(
"delayGreedy", po::value<int32_t>(&options.delayGreedy)->default_value(-1), "delay greedy, in milliseconds");
visibleOptDesc.add_options()(
"greedyRate", po::value<int32_t>(&options.greedyRate)->default_value(-1), "greedy rate, in milliseconds");
visibleOptDesc.add_options()(
"lifetime", po::value<time::milliseconds::rep>()->default_value(4000), "Interest lifetime, in milliseconds");
po::options_description hiddenOptDesc;
hiddenOptDesc.add_options()("prefix", po::value<std::string>(), "content prefix to request");
po::options_description optDesc;
optDesc.add(visibleOptDesc).add(hiddenOptDesc);
po::positional_options_description optPos;
optPos.add("prefix", -1);
try
{
po::variables_map optVm;
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(optPos).run(), optVm);
po::notify(optVm);
if (optVm.count("help") > 0)
{
usage(visibleOptDesc);
}
if (optVm.count("version") > 0)
{
std::cout << "qsccp-client " << tools::VERSION << std::endl;
exit(0);
}
if (optVm.count("prefix") > 0)
{
options.prefix = Name(optVm["prefix"].as<std::string>());
}
else
{
std::cerr << "ERROR: No prefix specified" << std::endl;
usage(visibleOptDesc);
}
options.lifetime = time::milliseconds(optVm["lifetime"].as<time::milliseconds::rep>());
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
usage(visibleOptDesc);
}
return Runner(options).run();
}
}
}
}
int main(int argc, char *argv[])
{
return ndn::cc::client::main(argc, argv);
}
+212
View File
@@ -0,0 +1,212 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
Consumer::Consumer(Face &face, const Options &options)
: m_options(options),
m_nSent(0),
m_nextSeq(options.startSeq),
m_seqMax(options.seqMax),
m_nOutstanding(0),
m_face(face),
m_scheduler(m_face.getIoService()),
m_stopFlag(false),
m_recvBytes(0),
m_traceTimes(0),
m_firstTime(true),
m_delivered(0),
m_deliveredTime(time::steady_clock::now()),
m_packetSize(1067)
{
if (m_options.seqMax < 0)
{
m_seqMax = std::numeric_limits<uint32_t>::max();
}
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
m_scheduler.schedule(
time::milliseconds(50),
bind(&Consumer::checkRetxTimeout, this));
}
void Consumer::traceRate()
{
if (!m_stopFlag)
{
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
m_traceTimes++;
// print rate
std::cout << "cc:rate:<" << m_traceTimes << "," << m_recvBytes << ">" << m_nSent << std::endl;
m_recvBytes = 0;
m_nSent = 0;
}
void Consumer::start()
{
m_stopFlag = false;
scheduleNextPacket();
}
void Consumer::stop()
{
m_nextInterestEvent.cancel();
m_retxEvent.cancel();
m_probeRTTEvent.cancel();
m_doGainCycleEvent.cancel();
m_stopFlag = true;
}
void Consumer::checkRetxTimeout()
{
auto now = time::steady_clock::now();
auto rto = m_rtt.RetransmitTimeout();
std::cout << "RTO: " << m_rtt.duration_to_second_double(rto) << std::endl;
while (!m_seqTimeouts.empty())
{
SeqTimeoutsContainer::index<i_timestamp>::type::iterator entry =
m_seqTimeouts.get<i_timestamp>().begin();
if (entry->time + rto <= now) // timeout expired?
{
// std::cout << "RTOTIMEOUT: " << entry->seq << std::endl;
uint32_t seqNo = entry->seq;
if (m_seqRetxCounts.find(seqNo) != m_seqRetxCounts.end())
{
m_seqTimeouts.get<i_timestamp>().erase(entry);
onTimeout(seqNo);
}
}
else
break; // nothing else to do. All later packets need not be retransmitted
}
m_retxEvent = m_scheduler.schedule(time::milliseconds(50), bind(&Consumer::checkRetxTimeout, this));
}
void Consumer::sendPacket()
{
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
bool isRetx = false;
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
isRetx = true;
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setServiceClass(5);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
BbrPacketInfo bbrPacketInfo{
now,
m_delivered,
m_deliveredTime,
isRetx
};
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, bbrPacketInfo),
bind(&Consumer::onNack, this, _2, seq, bbrPacketInfo),
bind(&Consumer::onTimeout, this, seq));
willSendInterest(seq);
scheduleNextPacket();
}
void Consumer::onTimeout(uint32_t seq)
{
std::cout << "onTimeout: " << seq << std::endl;
afterTimeout(seq);
m_rtt.IncreaseMultiplier(); // Double the next RTO
m_rtt.SentSeq(seq, 1); // make sure to disable RTT calculation for this sample
m_retxSeqs.insert(seq);
scheduleNextPacket();
// finish();
}
void Consumer::onData(const Data &data, uint32_t seq, const BbrPacketInfo &bbrInfo)
{
auto now = time::steady_clock::now();
std::cout << "cc:delay:<" << (now - bbrInfo.sendTime).count() << "," << seq << "," << (now - bbrInfo.sendTime).count() << ">" << std::endl;
// afterData(seq, now - sendTime);
if (m_seqRetxCounts.find(seq) != m_seqRetxCounts.end())
{
auto dataSize = data.wireEncode().size();
m_packetSize = 0.8 * m_packetSize + 0.2 * dataSize;
m_recvBytes += dataSize;
m_delivered += dataSize;
m_deliveredTime = now;
m_seqRetxCounts.erase(seq);
}
m_seqFullDelay.erase(seq);
m_seqLastDelay.erase(seq);
m_seqTimeouts.erase(seq);
m_retxSeqs.erase(seq);
m_rtt.AckSeq(seq);
// finish();
}
void Consumer::onNack(const lp::Nack &nack, uint32_t seq, const BbrPacketInfo &bbrInfo)
{
std::cout << "onNack: " << nack.getReason() << std::endl;
afterNack(seq, time::steady_clock::now() - bbrInfo.sendTime, nack.getHeader());
scheduleNextPacket();
// finish();
}
Name Consumer::makeInterestName(uint32_t seq)
{
return Name(m_options.prefix).appendNumber(seq);
}
void Consumer::willSendInterest(uint32_t seq)
{
++m_nSent;
m_seqTimeouts.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqFullDelay.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqLastDelay.erase(seq);
m_seqLastDelay.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqRetxCounts[seq]++;
m_rtt.SentSeq(seq, 1);
}
}
}
}
+193
View File
@@ -0,0 +1,193 @@
#ifndef NDN_CONSUMER_H
#define NDN_CONSUMER_H
#include "core/common.hpp"
#include "ndn-rtt-mean-deviation.hpp"
#include <set>
#include <map>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/tag.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
namespace ndn
{
namespace cc
{
namespace client
{
typedef time::duration<double, time::milliseconds::period> Rtt;
enum CcAlgorithm
{
AIMD,
BIC,
CUBIC
};
class Options
{
public:
Name prefix;
uint32_t startSeq; // Initial sequence number
time::milliseconds lifetime; // Lifetime of Interest
int64_t seqMax; // Max sequence number
uint32_t tos; // Type of Service
uint32_t dsz; // Data size
int32_t reqNum; // Request Data Num(-1 indicator infinity)
uint32_t initialSendRate; // Initial Send Rate
int32_t fixedRate; // Fixed Send Rate
int32_t timingStop; // Timing Stop(-1 indicator infinity)
int32_t delayGreedy; // Start greedy after the specified time (ms)
int32_t greedyRate; // Greedy Send Rate(-1 indicator no greedy)
uint32_t delayStart; // Delay Start(ms)
};
struct BbrPacketInfo {
// Interest packet send time
time::steady_clock::TimePoint sendTime;
// Total data bytes received while send this Interest
uint64_t deliveredAtSend;
// Last acked data received time while send this Interest
time::steady_clock::TimePoint deliveredTime;
bool isRetx;
};
typedef struct BbrPacketInfo BbrPacketInfo;
class Consumer : noncopyable
{
public:
Consumer(Face &face, const Options &options);
virtual ~Consumer() {}
signal::Signal<Consumer, uint32_t, Rtt> afterData;
signal::Signal<Consumer, uint32_t, Rtt, lp::NackHeader> afterNack;
signal::Signal<Consumer, uint32_t> afterTimeout;
signal::Signal<Consumer> afterFinish;
void start();
void stop();
public:
Name makeInterestName(uint32_t seq);
virtual void sendPacket();
virtual void onTimeout(uint32_t seq);
virtual void onData(const Data &data, uint32_t seq, const BbrPacketInfo &bbrInfo);
virtual void onNack(const lp::Nack &nack, uint32_t seq, const BbrPacketInfo &bbrInfo);
virtual void willSendInterest(uint32_t seq);
protected:
virtual void scheduleNextPacket() = 0;
/**
* \brief Checks if the packet need to be retransmitted becuase of retransmission timer expiration
*/
void
checkRetxTimeout();
private:
void traceRate();
protected:
const Options &m_options;
int m_nSent;
uint32_t m_nextSeq;
int64_t m_seqMax;
int m_nOutstanding;
Face &m_face;
Scheduler m_scheduler;
int m_stopFlag;
uint64_t m_recvBytes;
uint64_t m_traceTimes;
time::steady_clock::TimePoint m_startTime;
scheduler::EventId m_nextInterestEvent;
scheduler::EventId m_probeRTTEvent;
scheduler::EventId m_doGainCycleEvent;
bool m_firstTime = true;
// bbr
uint64_t m_delivered;
time::steady_clock::TimePoint m_deliveredTime;
uint32_t m_packetSize = 8624;
/// @cond include_hidden
/**
* \struct This struct contains sequence numbers of packets to be retransmitted
*/
struct RetxSeqsContainer : public std::set<uint32_t>
{
};
RetxSeqsContainer m_retxSeqs; ///< \brief ordered set of sequence numbers to be retransmitted
RttMeanDeviation m_rtt;
scheduler::EventId m_retxEvent;
// RttEstimator m_rtt;
/**
* \struct This struct contains a pair of packet sequence number and its timeout
*/
struct SeqTimeout
{
SeqTimeout(uint32_t _seq, time::steady_clock::TimePoint _time)
: seq(_seq), time(_time)
{
}
uint32_t seq;
time::steady_clock::TimePoint time;
};
/// @endcond
/// @cond include_hidden
class i_seq
{
};
class i_timestamp
{
};
/// @endcond
/// @cond include_hidden
/**
* \struct This struct contains a multi-index for the set of SeqTimeout structs
*/
struct SeqTimeoutsContainer
: public boost::multi_index::
multi_index_container<SeqTimeout,
boost::multi_index::
indexed_by<boost::multi_index::
ordered_unique<boost::multi_index::tag<i_seq>,
boost::multi_index::
member<SeqTimeout, uint32_t,
&SeqTimeout::seq>>,
boost::multi_index::
ordered_non_unique<boost::multi_index::
tag<i_timestamp>,
boost::multi_index::
member<SeqTimeout, time::steady_clock::TimePoint,
&SeqTimeout::time>>>>
{
};
SeqTimeoutsContainer m_seqTimeouts; ///< \brief multi-index for the set of SeqTimeout structs
SeqTimeoutsContainer m_seqLastDelay;
SeqTimeoutsContainer m_seqFullDelay;
std::map<uint32_t, uint32_t> m_seqRetxCounts;
};
}
}
}
#endif /* RTT_ESTIMATOR_H */
+169
View File
@@ -0,0 +1,169 @@
#include "ndn-rtt-estimator.hpp"
#include <cmath>
namespace ndn
{
namespace cc
{
namespace client
{
void
RttEstimator::SetMinRto(time::steady_clock::duration minRto)
{
m_minRto = minRto;
}
time::steady_clock::duration
RttEstimator::GetMinRto(void) const
{
return m_minRto;
}
void
RttEstimator::SetMaxRto(time::steady_clock::duration maxRto)
{
m_maxRto = maxRto;
}
time::steady_clock::duration
RttEstimator::GetMaxRto(void) const
{
return m_maxRto;
}
void
RttEstimator::SetCurrentEstimate(time::steady_clock::duration estimate)
{
m_currentEstimatedRtt = estimate;
}
time::steady_clock::duration
RttEstimator::GetCurrentEstimate(void) const
{
return m_currentEstimatedRtt;
}
// RttHistory methods
RttHistory::RttHistory(uint32_t s, uint32_t c, time::steady_clock::TimePoint t)
: seq(s), count(c), time(t), retx(false)
{
}
RttHistory::RttHistory(const RttHistory &h)
: seq(h.seq), count(h.count), time(h.time), retx(h.retx)
{
}
// Base class methods
RttEstimator::RttEstimator()
: m_next(1),
m_maxMultiplier(64),
m_initialEstimatedRtt(second_double_to_duration(1)), // 1 seconds
m_minRto(second_double_to_duration(0.2)), // 0.2 seconds
m_maxRto(second_double_to_duration(200)), // 200 seconds
m_nSamples(0),
m_multiplier(1),
m_history()
{
m_currentEstimatedRtt = m_initialEstimatedRtt;
}
RttEstimator::RttEstimator(const RttEstimator &c)
: m_next(c.m_next), m_maxMultiplier(c.m_maxMultiplier), m_initialEstimatedRtt(c.m_initialEstimatedRtt), m_currentEstimatedRtt(c.m_currentEstimatedRtt), m_minRto(c.m_minRto), m_maxRto(c.m_maxRto), m_nSamples(c.m_nSamples), m_multiplier(c.m_multiplier), m_history(c.m_history)
{
}
RttEstimator::~RttEstimator()
{
}
void
RttEstimator::SentSeq(uint32_t seq, uint32_t size)
{
// Note that a particular sequence has been sent
if (seq == m_next)
{ // This is the next expected one, just log at end
m_history.push_back(RttHistory(seq, size, time::steady_clock::now()));
m_next = seq + uint32_t(size); // Update next expected
}
else
{ // This is a retransmit, find in list and mark as re-tx
for (RttHistory_t::iterator i = m_history.begin(); i != m_history.end(); ++i)
{
if ((seq >= i->seq) && (seq < (i->seq + uint32_t(i->count))))
{ // Found it
i->retx = true;
// One final test..be sure this re-tx does not extend "next"
if ((seq + uint32_t(size)) > m_next)
{
m_next = seq + uint32_t(size);
i->count = ((seq + uint32_t(size)) - i->seq); // And update count in hist
}
break;
}
}
}
}
time::steady_clock::duration
RttEstimator::AckSeq(uint32_t ackSeq)
{
// An ack has been received, calculate rtt and log this measurement
// Note we use a linear search (O(n)) for this since for the common
// case the ack'ed packet will be at the head of the list
time::steady_clock::duration m = time::steady_clock::duration::zero();
if (m_history.size() == 0)
{
return (m); // No pending history, just exit
}
RttHistory &h = m_history.front();
if (!h.retx && ackSeq >= (h.seq + uint32_t(h.count)))
{ // Ok to use this sample
m = time::steady_clock::now() - h.time; // Elapsed time::nanoseconds
Measurement(m); // Log the measurement
ResetMultiplier(); // Reset multiplier on valid measurement
}
// Now delete all ack history with seq <= ack
while (m_history.size() > 0)
{
RttHistory &h = m_history.front();
if ((h.seq + uint32_t(h.count)) > ackSeq)
break; // Done removing
m_history.pop_front(); // Remove
}
return m;
}
void
RttEstimator::ClearSent()
{
// Clear all history entries
m_next = 1;
m_history.clear();
}
void
RttEstimator::IncreaseMultiplier()
{
m_multiplier = (m_multiplier * 2 < m_maxMultiplier) ? m_multiplier * 2 : m_maxMultiplier;
}
void
RttEstimator::ResetMultiplier()
{
m_multiplier = 1;
}
void
RttEstimator::Reset()
{
// Reset to initial state
m_next = 1;
m_currentEstimatedRtt = m_initialEstimatedRtt;
m_history.clear(); // Remove all info from the history
m_nSamples = 0;
ResetMultiplier();
}
}
}
}
+166
View File
@@ -0,0 +1,166 @@
#ifndef NDN_RTT_ESTIMATOR_H
#define NDN_RTT_ESTIMATOR_H
#include "core/common.hpp"
#include <deque>
namespace ndn
{
namespace cc
{
namespace client
{
/**
* \ingroup ndn-apps
*
* \brief Helper class to store RTT measurements
*/
class RttHistory
{
public:
RttHistory(uint32_t seq, uint32_t size, time::steady_clock::TimePoint time);
RttHistory(const RttHistory &h); // Copy constructor
public:
uint32_t seq; // First sequence number in packet sent
uint32_t count; // Number of bytes sent
time::steady_clock::TimePoint time; // Time this one was sent
bool retx; // True if this has been retransmitted
};
typedef std::deque<RttHistory> RttHistory_t;
/**
* \ingroup tcp
*
* \brief Base class for all RTT Estimators
*/
class RttEstimator
{
public:
RttEstimator();
RttEstimator(const RttEstimator &);
virtual ~RttEstimator();
/**
* \brief Note that a particular sequence has been sent
* \param seq the packet sequence number.
* \param size the packet size.
*/
virtual void
SentSeq(uint32_t seq, uint32_t size);
/**
* \brief Note that a particular ack sequence has been received
* \param ackSeq the ack sequence number.
* \return The measured RTT for this ack.
*/
virtual time::steady_clock::duration
AckSeq(uint32_t ackSeq);
/**
* \brief Clear all history entries
*/
virtual void
ClearSent();
/**
* \brief Add a new measurement to the estimator. Pure virtual function.
* \param t the new RTT measure.
*/
virtual void
Measurement(time::steady_clock::duration t) = 0;
/**
* \brief Returns the estimated RTO. Pure virtual function.
* \return the estimated RTO.
*/
virtual time::steady_clock::duration
RetransmitTimeout() = 0;
/**
* \brief Increase the estimation multiplier up to MaxMultiplier.
*/
virtual void
IncreaseMultiplier();
/**
* \brief Resets the estimation multiplier to 1.
*/
virtual void
ResetMultiplier();
/**
* \brief Resets the estimation to its initial state.
*/
virtual void
Reset();
/**
* \brief Sets the Minimum RTO.
* \param minRto The minimum RTO returned by the estimator.
*/
void
SetMinRto(time::steady_clock::duration minRto);
/**
* \brief Get the Minimum RTO.
* \return The minimum RTO returned by the estimator.
*/
time::steady_clock::duration
GetMinRto(void) const;
/**
* \brief Sets the Maximum RTO.
* \param minRto The maximum RTO returned by the estimator.
*/
void
SetMaxRto(time::steady_clock::duration maxRto);
/**
* \brief Get the Maximum RTO.
* \return The maximum RTO returned by the estimator.
*/
time::steady_clock::duration
GetMaxRto(void) const;
/**
* \brief Sets the current RTT estimate (forcefully).
* \param estimate The current RTT estimate.
*/
void
SetCurrentEstimate(time::steady_clock::duration estimate);
/**
* \brief gets the current RTT estimate.
* \return The current RTT estimate.
*/
time::steady_clock::duration
GetCurrentEstimate(void) const;
double
duration_to_second_double(time::steady_clock::duration d) {
return d.count() / 1000000000.0;
}
time::steady_clock::duration
second_double_to_duration(double d) {
return time::steady_clock::duration(static_cast<long long>(d * 1000000000.0));
}
private:
uint32_t m_next; // Next expected sequence to be sent
uint16_t m_maxMultiplier;
time::steady_clock::duration m_initialEstimatedRtt;
protected:
time::steady_clock::duration m_currentEstimatedRtt; // Current estimate
time::steady_clock::duration m_minRto; // minimum value of the timeout
time::steady_clock::duration m_maxRto; // maximum value of the timeout
uint32_t m_nSamples; // Number of samples
uint16_t m_multiplier; // RTO Multiplier
RttHistory_t m_history; // List of sent packet
};
}
}
}
#endif /* RTT_ESTIMATOR_H */
+119
View File
@@ -0,0 +1,119 @@
#include "ndn-rtt-mean-deviation.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
RttMeanDeviation::RttMeanDeviation()
: m_gain(0.125),
m_gain2(0.25),
m_variance(0)
{
}
RttMeanDeviation::RttMeanDeviation(const RttMeanDeviation &c)
: RttEstimator(c), m_gain(c.m_gain), m_gain2(c.m_gain2), m_variance(c.m_variance)
{
}
void
RttMeanDeviation::Measurement(time::steady_clock::duration m)
{
if (m_nSamples)
{ // Not first
time::steady_clock::duration err(m - m_currentEstimatedRtt);
double gErr = duration_to_second_double(err) * m_gain;
m_currentEstimatedRtt += second_double_to_duration(gErr);
auto abs_err = err > time::steady_clock::duration::zero() ? err : -err;
time::steady_clock::duration difference = abs_err - m_variance;
m_variance += second_double_to_duration(m_gain2 * duration_to_second_double(difference));
}
else
{ // First sample
m_currentEstimatedRtt = m; // Set estimate to current
// variance = sample / 2; // And variance to current / 2
// m_variance = m; // try this why????
m_variance = second_double_to_duration(duration_to_second_double(m) / 2);
}
m_nSamples++;
}
time::steady_clock::duration
RttMeanDeviation::RetransmitTimeout()
{
// std::cout << "RTO -> m_currentEstimatedRtt:" << duration_to_second_double(m_currentEstimatedRtt) << ", m_variance:" << duration_to_second_double(m_variance) << std::endl;
double retval = std::min(
duration_to_second_double(m_maxRto),
std::max(
m_multiplier * duration_to_second_double(m_minRto),
m_multiplier * (duration_to_second_double(m_currentEstimatedRtt) + 4 * duration_to_second_double(m_variance))));
return second_double_to_duration(retval);
}
void
RttMeanDeviation::Reset()
{
// Reset to initial state
m_variance = time::steady_clock::duration::zero();
RttEstimator::Reset();
}
void
RttMeanDeviation::Gain(double g)
{
m_gain = g;
}
void
RttMeanDeviation::SentSeq(uint32_t seq, uint32_t size)
{
RttHistory_t::iterator i;
for (i = m_history.begin(); i != m_history.end(); ++i)
{
if (seq == i->seq)
{ // Found it
i->retx = true;
break;
}
}
// Note that a particular sequence has been sent
if (i == m_history.end())
m_history.push_back(RttHistory(seq, size, time::steady_clock::now()));
}
time::steady_clock::duration
RttMeanDeviation::AckSeq(uint32_t ackSeq)
{
// An ack has been received, calculate rtt and log this measurement
// Note we use a linear search (O(n)) for this since for the common
// case the ack'ed packet will be at the head of the list
auto m = time::steady_clock::duration::zero();
if (m_history.size() == 0)
return (m); // No pending history, just exit
for (RttHistory_t::iterator i = m_history.begin(); i != m_history.end(); ++i)
{
if (ackSeq == i->seq)
{ // Found it
if (!i->retx)
{
m = time::steady_clock::now() - i->time; // Elapsed time
Measurement(m); // Log the measurement
ResetMultiplier(); // Reset multiplier on valid measurement
}
m_history.erase(i);
break;
}
}
return m;
}
}
}
}
+57
View File
@@ -0,0 +1,57 @@
#ifndef NDN_RTT_MEAN_DEVIATION_H
#define NDN_RTT_MEAN_DEVIATION_H
#include "ndn-rtt-estimator.hpp"
#endif /* NDN_RTT_MEAN_DEVIATION_H */
namespace ndn
{
namespace cc
{
namespace client
{
/**
* \ingroup ndn-apps
*
* \brief The modified version of "Mean--Deviation" RTT estimator, as discussed by Van Jacobson that
*better suits NDN communication model
*
* This class implements the "Mean--Deviation" RTT estimator, as discussed
* by Van Jacobson and Michael J. Karels, in
* "Congestion Avoidance and Control", SIGCOMM 88, Appendix A
*
*/
class RttMeanDeviation : public RttEstimator
{
public:
RttMeanDeviation();
RttMeanDeviation(const RttMeanDeviation &);
void
SentSeq(uint32_t seq, uint32_t size);
time::steady_clock::duration
AckSeq(uint32_t ackSeq);
void
Measurement(time::steady_clock::duration measure);
time::steady_clock::duration
RetransmitTimeout();
void
Reset();
void
Gain(double g);
private:
double m_gain; // Filter gain
double m_gain2; // Filter gain
time::steady_clock::duration m_variance; // Current variance
};
}
}
}
+157
View File
@@ -0,0 +1,157 @@
#include "core/common.hpp"
#include "core/version.hpp"
#include "qsccp-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
class Runner : noncopyable
{
public:
explicit Runner(const Options &options) : m_consumer(m_face, options)
{
m_consumer.afterFinish.connect([this]
{ this->cancel(); });
}
int
run()
{
try
{
m_consumer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
// m_tracer.onError(e.what());
return 2;
}
return 0;
}
private:
void
cancel()
{
m_consumer.stop();
}
private:
Face m_face;
QsccpConsumer m_consumer;
};
static void
usage(const boost::program_options::options_description &options)
{
std::cout << "Usage: ndnping [options] ndn:/name/prefix\n"
"\n"
"Ping a NDN name prefix using Interests with name ndn:/name/prefix/ping/number.\n"
"The numbers in the Interests are randomly generated unless specified.\n"
"\n"
<< options;
exit(2);
}
static int
main(int argc, char *argv[])
{
Options options;
// options.shouldAllowStaleData = false;
// options.nPings = -1;
// options.interval = time::milliseconds(getDefaultPingInterval());
// options.timeout = time::milliseconds(getDefaultPingTimeoutThreshold());
// options.startSeq = 0;
// options.shouldGenerateRandomSeq = true;
// options.shouldPrintTimestamp = false;
// std::string identifier;
namespace po = boost::program_options;
po::options_description visibleOptDesc("Options");
visibleOptDesc.add_options()("help,h", "print this message and exit")("version,V", "display version and exit");
visibleOptDesc.add_options()(
"startSeq", po::value<uint64_t>(&options.startSeq)->default_value(0), "start sequence number");
visibleOptDesc.add_options()(
"seqMax", po::value<int64_t>(&options.seqMax)->default_value(-1), "maximum sequence number");
visibleOptDesc.add_options()(
"tos", po::value<uint32_t>(&options.tos)->default_value(5), "set the TOS field");
visibleOptDesc.add_options()(
"dsz", po::value<uint32_t>(&options.dsz)->default_value(8624), "data size");
visibleOptDesc.add_options()(
"delayStart", po::value<uint32_t>(&options.delayStart)->default_value(0), "delay start time, in milliseconds");
visibleOptDesc.add_options()(
"initialSendRate", po::value<uint32_t>(&options.initialSendRate)->default_value(0), "initial send rate, in milliseconds");
visibleOptDesc.add_options()(
"fixedRate", po::value<int32_t>(&options.fixedRate)->default_value(-1), "fixed rate, in milliseconds");
visibleOptDesc.add_options()(
"timingStop", po::value<int32_t>(&options.timingStop)->default_value(-1), "timing stop, in milliseconds");
visibleOptDesc.add_options()(
"delayGreedy", po::value<int32_t>(&options.delayGreedy)->default_value(-1), "delay greedy, in milliseconds");
visibleOptDesc.add_options()(
"greedyRate", po::value<int32_t>(&options.greedyRate)->default_value(-1), "greedy rate, in milliseconds");
visibleOptDesc.add_options()(
"lifetime", po::value<time::milliseconds::rep>()->default_value(4000), "Interest lifetime, in milliseconds");
po::options_description hiddenOptDesc;
hiddenOptDesc.add_options()("prefix", po::value<std::string>(), "content prefix to request");
po::options_description optDesc;
optDesc.add(visibleOptDesc).add(hiddenOptDesc);
po::positional_options_description optPos;
optPos.add("prefix", -1);
try
{
po::variables_map optVm;
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(optPos).run(), optVm);
po::notify(optVm);
if (optVm.count("help") > 0)
{
usage(visibleOptDesc);
}
if (optVm.count("version") > 0)
{
std::cout << "qsccp-client " << tools::VERSION << std::endl;
exit(0);
}
if (optVm.count("prefix") > 0)
{
options.prefix = Name(optVm["prefix"].as<std::string>());
}
else
{
std::cerr << "ERROR: No prefix specified" << std::endl;
usage(visibleOptDesc);
}
options.lifetime = time::milliseconds(optVm["lifetime"].as<time::milliseconds::rep>());
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
usage(visibleOptDesc);
}
std::cout << "PING " << options.prefix << std::endl;
return Runner(options).run();
}
}
}
}
int main(int argc, char *argv[])
{
return ndn::cc::client::main(argc, argv);
}
+140
View File
@@ -0,0 +1,140 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
Consumer::Consumer(Face &face, const Options &options)
: m_options(options),
m_nSent(0),
m_nextSeq(options.startSeq),
m_seqMax(options.seqMax),
m_nOutstanding(0),
m_face(face),
m_scheduler(m_face.getIoService()),
m_stopFlag(false),
m_recvBytes(0),
m_traceTimes(0)
{
if (m_options.seqMax < 0)
{
m_seqMax = std::numeric_limits<uint64_t>::max();
}
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
void Consumer::traceRate()
{
if (!m_stopFlag)
{
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
m_traceTimes++;
// print rate
std::cout << "cc:rate:<" << m_traceTimes << "," << m_recvBytes << ">" << m_nSent << std::endl;
m_recvBytes = 0;
m_nSent = 0;
}
void Consumer::start()
{
m_stopFlag = false;
scheduleNextPacket();
}
void Consumer::stop()
{
m_nextInterestEvent.cancel();
m_stopFlag = true;
}
void Consumer::sendPacket()
{
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, now),
bind(&Consumer::onNack, this, _2, seq, now),
bind(&Consumer::onTimeout, this, seq));
++m_nSent;
++m_nOutstanding;
scheduleNextPacket();
// if (m_nSent < m_seqMax)
// {
// m_nextInterestEvent = m_scheduler.schedule(m_options.interval, [this]
// { scheduleNextInterest(); });
// }
// else
// {
// finish();
// }
}
void Consumer::onTimeout(uint64_t seq)
{
m_retxSeqs.insert(seq);
afterTimeout(seq);
scheduleNextPacket();
// finish();
}
void Consumer::onData(const Data &data, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
auto now = time::steady_clock::now();
std::cout << "cc:delay:<" << (now - m_startTime).count() << "," << seq << "," << (now - sendTime).count() << ">" << std::endl;
afterData(seq, now - sendTime);
m_recvBytes += data.wireEncode().size();
// finish();
}
void Consumer::onNack(const lp::Nack &nack, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
std::cout << "onNack: " << nack.getReason() << std::endl;
afterNack(seq, time::steady_clock::now() - sendTime, nack.getHeader());
scheduleNextPacket();
// finish();
}
Name Consumer::makeInterestName(uint64_t seq)
{
return Name(m_options.prefix).appendNumber(seq);
}
}
}
}
+90
View File
@@ -0,0 +1,90 @@
#include "core/common.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
typedef time::duration<double, time::milliseconds::period> Rtt;
class Options
{
public:
Name prefix;
uint64_t startSeq; // Initial sequence number
time::milliseconds lifetime; // Lifetime of Interest
int64_t seqMax; // Max sequence number
uint32_t tos; // Type of Service
uint32_t dsz; // Data size
int32_t reqNum; // Request Data Num(-1 indicator infinity)
uint32_t initialSendRate; // Initial Send Rate
int32_t fixedRate; // Fixed Send Rate
int32_t timingStop; // Timing Stop(-1 indicator infinity)
int32_t delayGreedy; // Start greedy after the specified time (ms)
int32_t greedyRate; // Greedy Send Rate(-1 indicator no greedy)
uint32_t delayStart; // Delay Start(ms)
};
class Consumer : noncopyable
{
public:
Consumer(Face &face, const Options &options);
virtual ~Consumer(){}
signal::Signal<Consumer, uint64_t, Rtt> afterData;
signal::Signal<Consumer, uint64_t, Rtt, lp::NackHeader> afterNack;
signal::Signal<Consumer, uint64_t> afterTimeout;
signal::Signal<Consumer> afterFinish;
void start();
void stop();
public:
Name makeInterestName(uint64_t seq);
virtual void sendPacket();
virtual void onTimeout(uint64_t seq);
virtual void onData(const Data& data, uint64_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void onNack(const lp::Nack &nack, uint64_t seq, const time::steady_clock::TimePoint &sendTime);
protected:
virtual void scheduleNextPacket() = 0;
private:
void traceRate();
protected:
const Options &m_options;
int m_nSent;
uint64_t m_nextSeq;
int64_t m_seqMax;
int m_nOutstanding;
Face &m_face;
Scheduler m_scheduler;
int m_stopFlag;
uint64_t m_recvBytes;
uint64_t m_traceTimes;
time::steady_clock::TimePoint m_startTime;
/// @cond include_hidden
/**
* \struct This struct contains sequence numbers of packets to be retransmitted
*/
struct RetxSeqsContainer : public std::set<uint32_t>
{
};
RetxSeqsContainer m_retxSeqs; ///< \brief ordered set of sequence numbers to be retransmitted
scheduler::EventId m_nextInterestEvent;
};
}
}
}
+160
View File
@@ -0,0 +1,160 @@
#include "qsccp-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
QsccpConsumer::QsccpConsumer(Face &face, const Options &options)
: Consumer(face, options),
m_firstTime(true),
tos(options.tos),
delayStart(options.delayStart),
dsz(options.dsz),
sendRate(options.initialSendRate),
fixedRate(options.fixedRate),
timingStop(options.timingStop),
delayGreedy(options.delayGreedy),
greedyRate(options.greedyRate),
m_recvDataNum(0)
{
}
uint64_t QsccpConsumer::updateRate(uint64_t newRate)
{
if (this->fixedRate > 0)
{
return this->fixedRate;
}
if (this->sendRate == 0)
{
this->sendRate = newRate;
}
else
{
this->sendRate = 0.8 * this->sendRate + 0.2 * newRate;
}
return this->sendRate;
}
void QsccpConsumer::startGreedy()
{
this->fixedRate = this->greedyRate;
this->sendRate = this->greedyRate;
}
void QsccpConsumer::onData(const Data &data, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
// print tags
auto targetRate = data.getTargetRate();
// std::cout << "onData: " << seq << ", targetRate: " << *targetRate << std::endl;
if (targetRate)
{
this->updateRate(*targetRate);
}
else
{
// NS_LOG_DEBUG("no target rate");
}
scheduleNextPacket();
Consumer::onData(data, seq, sendTime);
++this->m_recvDataNum;
if (this->m_seqMax == this->m_recvDataNum)
{
this->stop();
return;
}
}
void QsccpConsumer::scheduleNextPacket()
{
if (m_stopFlag)
{
return;
}
if (this->m_firstTime)
{
m_startTime = time::steady_clock::now();
this->m_firstTime = false;
if (this->fixedRate > 0)
{
this->sendRate = fixedRate;
}
if (this->timingStop > 0)
{
m_scheduler.schedule(time::milliseconds(this->timingStop), [this]
{
stop(); });
}
if (this->delayGreedy > 0 && this->greedyRate > 0)
{
m_scheduler.schedule(time::milliseconds(this->delayGreedy), [this]
{ startGreedy(); });
}
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(this->delayStart), [this]
{ sendPacket(); });
}
else if (!m_nextInterestEvent)
{
if (this->sendRate <= 0)
{
return;
}
auto waitTime = (this->dsz * 1000000000) / this->sendRate + 1;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ sendPacket(); });
}
}
void QsccpConsumer::sendPacket()
{
scheduleNextPacket();
// send packet here
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
stop();
// finish();
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setServiceClass(this->tos);
interest.setDsz(this->dsz);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, now),
bind(&Consumer::onNack, this, _2, seq, now),
bind(&Consumer::onTimeout, this, seq));
++m_nSent;
++m_nOutstanding;
}
}
}
}
+41
View File
@@ -0,0 +1,41 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
class QsccpConsumer : public Consumer
{
public:
explicit QsccpConsumer(Face &face, const Options &options);
void sendPacket() override;
void onData(const Data& data, uint64_t seq, const time::steady_clock::TimePoint &sendTime) override;
protected:
void scheduleNextPacket() override;
void startGreedy();
private:
uint64_t updateRate(uint64_t newRate);
private:
bool m_firstTime = true;
// private attribute here
uint64_t tos;
uint64_t delayStart;
uint64_t dsz;
uint64_t sendRate;
int32_t fixedRate;
int32_t timingStop;
int32_t delayGreedy;
int32_t greedyRate;
uint64_t m_recvDataNum;
};
}
}
}
+194
View File
@@ -0,0 +1,194 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
Consumer::Consumer(Face &face, const Options &options)
: m_options(options),
m_nSent(0),
m_nextSeq(options.startSeq),
m_seqMax(options.seqMax),
m_nOutstanding(0),
m_face(face),
m_scheduler(m_face.getIoService()),
m_stopFlag(false),
m_recvBytes(0),
m_traceTimes(0),
m_firstTime(true)
{
if (m_options.seqMax < 0)
{
m_seqMax = std::numeric_limits<uint32_t>::max();
}
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
m_scheduler.schedule(
time::milliseconds(50),
bind(&Consumer::checkRetxTimeout, this));
}
void Consumer::traceRate()
{
if (!m_stopFlag)
{
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
m_traceTimes++;
// print rate
std::cout << "cc:rate:<" << m_traceTimes << "," << m_recvBytes << ">" << m_nSent << std::endl;
m_recvBytes = 0;
m_nSent = 0;
}
void Consumer::start()
{
m_stopFlag = false;
scheduleNextPacket();
}
void Consumer::stop()
{
m_nextInterestEvent.cancel();
m_retxEvent.cancel();
m_stopFlag = true;
}
void Consumer::checkRetxTimeout()
{
auto now = time::steady_clock::now();
auto rto = m_rtt.RetransmitTimeout();
std::cout << "RTO: " << m_rtt.duration_to_second_double(rto) << std::endl;
while (!m_seqTimeouts.empty())
{
SeqTimeoutsContainer::index<i_timestamp>::type::iterator entry =
m_seqTimeouts.get<i_timestamp>().begin();
if (entry->time + rto <= now) // timeout expired?
{
// std::cout << "RTOTIMEOUT: " << entry->seq << std::endl;
uint32_t seqNo = entry->seq;
if (m_seqRetxCounts.find(seqNo) != m_seqRetxCounts.end())
{
m_seqTimeouts.get<i_timestamp>().erase(entry);
onTimeout(seqNo);
}
}
else
break; // nothing else to do. All later packets need not be retransmitted
}
m_retxEvent = m_scheduler.schedule(time::milliseconds(50), bind(&Consumer::checkRetxTimeout, this));
}
void Consumer::sendPacket()
{
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setServiceClass(5);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, now),
bind(&Consumer::onNack, this, _2, seq, now),
bind(&Consumer::onTimeout, this, seq));
willSendInterest(seq);
scheduleNextPacket();
}
void Consumer::onTimeout(uint32_t seq)
{
std::cout << "onTimeout: " << seq << std::endl;
afterTimeout(seq);
m_rtt.IncreaseMultiplier(); // Double the next RTO
m_rtt.SentSeq(seq, 1); // make sure to disable RTT calculation for this sample
m_retxSeqs.insert(seq);
scheduleNextPacket();
// finish();
}
void Consumer::onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime)
{
auto now = time::steady_clock::now();
std::cout << "cc:delay:<" << (now - m_startTime).count() << "," << seq << "," << (now - sendTime).count() << ">" << std::endl;
afterData(seq, now - sendTime);
if (m_seqRetxCounts.find(seq) != m_seqRetxCounts.end())
{
m_recvBytes += data.wireEncode().size();
m_seqRetxCounts.erase(seq);
}
m_seqFullDelay.erase(seq);
m_seqLastDelay.erase(seq);
m_seqTimeouts.erase(seq);
m_retxSeqs.erase(seq);
m_rtt.AckSeq(seq);
// finish();
}
void Consumer::onNack(const lp::Nack &nack, uint32_t seq, const time::steady_clock::TimePoint &sendTime)
{
std::cout << "onNack: " << nack.getReason() << std::endl;
afterNack(seq, time::steady_clock::now() - sendTime, nack.getHeader());
scheduleNextPacket();
// finish();
}
Name Consumer::makeInterestName(uint32_t seq)
{
return Name(m_options.prefix).appendNumber(seq);
}
void Consumer::willSendInterest(uint32_t seq)
{
++m_nSent;
m_seqTimeouts.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqFullDelay.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqLastDelay.erase(seq);
m_seqLastDelay.insert(SeqTimeout(seq, time::steady_clock::now()));
m_seqRetxCounts[seq]++;
m_rtt.SentSeq(seq, 1);
}
}
}
}
+186
View File
@@ -0,0 +1,186 @@
#ifndef NDN_CONSUMER_H
#define NDN_CONSUMER_H
#include "core/common.hpp"
#include "ndn-rtt-mean-deviation.hpp"
#include <set>
#include <map>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/tag.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
namespace ndn
{
namespace cc
{
namespace client
{
typedef time::duration<double, time::milliseconds::period> Rtt;
enum CcAlgorithm
{
AIMD,
BIC,
CUBIC
};
class Options
{
public:
Name prefix;
uint32_t startSeq; // Initial sequence number
time::milliseconds lifetime; // Lifetime of Interest
int64_t seqMax; // Max sequence number
uint32_t tos; // Type of Service
uint32_t dsz; // Data size
int32_t reqNum; // Request Data Num(-1 indicator infinity)
uint32_t initialSendRate; // Initial Send Rate
int32_t fixedRate; // Fixed Send Rate
int32_t timingStop; // Timing Stop(-1 indicator infinity)
int32_t delayGreedy; // Start greedy after the specified time (ms)
int32_t greedyRate; // Greedy Send Rate(-1 indicator no greedy)
uint32_t delayStart; // Delay Start(ms)
std::string schema; // QSCCP or PCON
uint32_t initialWindowSize = 1; // Initial Window Size
bool setInitialWindowOnTimeout = false; // Set initial window size on timeout
CcAlgorithm ccAlgorithm = AIMD; // Specify which window adaptation algorithm to use (AIMD, BIC, or CUBIC)
double beta = 0.5; // TCP Multiplicative Decrease factor
double cubicBeta = 0.8; // TCP CUBIC Multiplicative Decrease factor
double addRttSuppress = 0.5; // Minimum number of RTTs (1 + this factor) between window decreases
bool reactToCongestionMarks = true; // React to congestion marks (ECN, CE)
bool useCwa = true; // Use Congestion Window Acceleration (CWA) algorithm
bool useCubicFastConv = true; // Use CUBIC fast convergence algorithm
};
class Consumer : noncopyable
{
public:
Consumer(Face &face, const Options &options);
virtual ~Consumer() {}
signal::Signal<Consumer, uint32_t, Rtt> afterData;
signal::Signal<Consumer, uint32_t, Rtt, lp::NackHeader> afterNack;
signal::Signal<Consumer, uint32_t> afterTimeout;
signal::Signal<Consumer> afterFinish;
void start();
void stop();
public:
Name makeInterestName(uint32_t seq);
virtual void sendPacket();
virtual void onTimeout(uint32_t seq);
virtual void onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void onNack(const lp::Nack &nack, uint32_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void willSendInterest(uint32_t seq);
protected:
virtual void scheduleNextPacket() = 0;
/**
* \brief Checks if the packet need to be retransmitted becuase of retransmission timer expiration
*/
void
checkRetxTimeout();
private:
void traceRate();
protected:
const Options &m_options;
int m_nSent;
uint32_t m_nextSeq;
int64_t m_seqMax;
int m_nOutstanding;
Face &m_face;
Scheduler m_scheduler;
int m_stopFlag;
uint64_t m_recvBytes;
uint64_t m_traceTimes;
time::steady_clock::TimePoint m_startTime;
scheduler::EventId m_nextInterestEvent;
bool m_firstTime = true;
/// @cond include_hidden
/**
* \struct This struct contains sequence numbers of packets to be retransmitted
*/
struct RetxSeqsContainer : public std::set<uint32_t>
{
};
RetxSeqsContainer m_retxSeqs; ///< \brief ordered set of sequence numbers to be retransmitted
RttMeanDeviation m_rtt;
scheduler::EventId m_retxEvent;
// RttEstimator m_rtt;
/**
* \struct This struct contains a pair of packet sequence number and its timeout
*/
struct SeqTimeout
{
SeqTimeout(uint32_t _seq, time::steady_clock::TimePoint _time)
: seq(_seq), time(_time)
{
}
uint32_t seq;
time::steady_clock::TimePoint time;
};
/// @endcond
/// @cond include_hidden
class i_seq
{
};
class i_timestamp
{
};
/// @endcond
/// @cond include_hidden
/**
* \struct This struct contains a multi-index for the set of SeqTimeout structs
*/
struct SeqTimeoutsContainer
: public boost::multi_index::
multi_index_container<SeqTimeout,
boost::multi_index::
indexed_by<boost::multi_index::
ordered_unique<boost::multi_index::tag<i_seq>,
boost::multi_index::
member<SeqTimeout, uint32_t,
&SeqTimeout::seq>>,
boost::multi_index::
ordered_non_unique<boost::multi_index::
tag<i_timestamp>,
boost::multi_index::
member<SeqTimeout, time::steady_clock::TimePoint,
&SeqTimeout::time>>>>
{
};
SeqTimeoutsContainer m_seqTimeouts; ///< \brief multi-index for the set of SeqTimeout structs
SeqTimeoutsContainer m_seqLastDelay;
SeqTimeoutsContainer m_seqFullDelay;
std::map<uint32_t, uint32_t> m_seqRetxCounts;
};
}
}
}
#endif /* RTT_ESTIMATOR_H */
+169
View File
@@ -0,0 +1,169 @@
#include "ndn-rtt-estimator.hpp"
#include <cmath>
namespace ndn
{
namespace cc
{
namespace client
{
void
RttEstimator::SetMinRto(time::steady_clock::duration minRto)
{
m_minRto = minRto;
}
time::steady_clock::duration
RttEstimator::GetMinRto(void) const
{
return m_minRto;
}
void
RttEstimator::SetMaxRto(time::steady_clock::duration maxRto)
{
m_maxRto = maxRto;
}
time::steady_clock::duration
RttEstimator::GetMaxRto(void) const
{
return m_maxRto;
}
void
RttEstimator::SetCurrentEstimate(time::steady_clock::duration estimate)
{
m_currentEstimatedRtt = estimate;
}
time::steady_clock::duration
RttEstimator::GetCurrentEstimate(void) const
{
return m_currentEstimatedRtt;
}
// RttHistory methods
RttHistory::RttHistory(uint32_t s, uint32_t c, time::steady_clock::TimePoint t)
: seq(s), count(c), time(t), retx(false)
{
}
RttHistory::RttHistory(const RttHistory &h)
: seq(h.seq), count(h.count), time(h.time), retx(h.retx)
{
}
// Base class methods
RttEstimator::RttEstimator()
: m_next(1),
m_maxMultiplier(64),
m_initialEstimatedRtt(second_double_to_duration(1)), // 1 seconds
m_minRto(second_double_to_duration(0.2)), // 0.2 seconds
m_maxRto(second_double_to_duration(200)), // 200 seconds
m_nSamples(0),
m_multiplier(1),
m_history()
{
m_currentEstimatedRtt = m_initialEstimatedRtt;
}
RttEstimator::RttEstimator(const RttEstimator &c)
: m_next(c.m_next), m_maxMultiplier(c.m_maxMultiplier), m_initialEstimatedRtt(c.m_initialEstimatedRtt), m_currentEstimatedRtt(c.m_currentEstimatedRtt), m_minRto(c.m_minRto), m_maxRto(c.m_maxRto), m_nSamples(c.m_nSamples), m_multiplier(c.m_multiplier), m_history(c.m_history)
{
}
RttEstimator::~RttEstimator()
{
}
void
RttEstimator::SentSeq(uint32_t seq, uint32_t size)
{
// Note that a particular sequence has been sent
if (seq == m_next)
{ // This is the next expected one, just log at end
m_history.push_back(RttHistory(seq, size, time::steady_clock::now()));
m_next = seq + uint32_t(size); // Update next expected
}
else
{ // This is a retransmit, find in list and mark as re-tx
for (RttHistory_t::iterator i = m_history.begin(); i != m_history.end(); ++i)
{
if ((seq >= i->seq) && (seq < (i->seq + uint32_t(i->count))))
{ // Found it
i->retx = true;
// One final test..be sure this re-tx does not extend "next"
if ((seq + uint32_t(size)) > m_next)
{
m_next = seq + uint32_t(size);
i->count = ((seq + uint32_t(size)) - i->seq); // And update count in hist
}
break;
}
}
}
}
time::steady_clock::duration
RttEstimator::AckSeq(uint32_t ackSeq)
{
// An ack has been received, calculate rtt and log this measurement
// Note we use a linear search (O(n)) for this since for the common
// case the ack'ed packet will be at the head of the list
time::steady_clock::duration m = time::steady_clock::duration::zero();
if (m_history.size() == 0)
{
return (m); // No pending history, just exit
}
RttHistory &h = m_history.front();
if (!h.retx && ackSeq >= (h.seq + uint32_t(h.count)))
{ // Ok to use this sample
m = time::steady_clock::now() - h.time; // Elapsed time::nanoseconds
Measurement(m); // Log the measurement
ResetMultiplier(); // Reset multiplier on valid measurement
}
// Now delete all ack history with seq <= ack
while (m_history.size() > 0)
{
RttHistory &h = m_history.front();
if ((h.seq + uint32_t(h.count)) > ackSeq)
break; // Done removing
m_history.pop_front(); // Remove
}
return m;
}
void
RttEstimator::ClearSent()
{
// Clear all history entries
m_next = 1;
m_history.clear();
}
void
RttEstimator::IncreaseMultiplier()
{
m_multiplier = (m_multiplier * 2 < m_maxMultiplier) ? m_multiplier * 2 : m_maxMultiplier;
}
void
RttEstimator::ResetMultiplier()
{
m_multiplier = 1;
}
void
RttEstimator::Reset()
{
// Reset to initial state
m_next = 1;
m_currentEstimatedRtt = m_initialEstimatedRtt;
m_history.clear(); // Remove all info from the history
m_nSamples = 0;
ResetMultiplier();
}
}
}
}
+166
View File
@@ -0,0 +1,166 @@
#ifndef NDN_RTT_ESTIMATOR_H
#define NDN_RTT_ESTIMATOR_H
#include "core/common.hpp"
#include <deque>
namespace ndn
{
namespace cc
{
namespace client
{
/**
* \ingroup ndn-apps
*
* \brief Helper class to store RTT measurements
*/
class RttHistory
{
public:
RttHistory(uint32_t seq, uint32_t size, time::steady_clock::TimePoint time);
RttHistory(const RttHistory &h); // Copy constructor
public:
uint32_t seq; // First sequence number in packet sent
uint32_t count; // Number of bytes sent
time::steady_clock::TimePoint time; // Time this one was sent
bool retx; // True if this has been retransmitted
};
typedef std::deque<RttHistory> RttHistory_t;
/**
* \ingroup tcp
*
* \brief Base class for all RTT Estimators
*/
class RttEstimator
{
public:
RttEstimator();
RttEstimator(const RttEstimator &);
virtual ~RttEstimator();
/**
* \brief Note that a particular sequence has been sent
* \param seq the packet sequence number.
* \param size the packet size.
*/
virtual void
SentSeq(uint32_t seq, uint32_t size);
/**
* \brief Note that a particular ack sequence has been received
* \param ackSeq the ack sequence number.
* \return The measured RTT for this ack.
*/
virtual time::steady_clock::duration
AckSeq(uint32_t ackSeq);
/**
* \brief Clear all history entries
*/
virtual void
ClearSent();
/**
* \brief Add a new measurement to the estimator. Pure virtual function.
* \param t the new RTT measure.
*/
virtual void
Measurement(time::steady_clock::duration t) = 0;
/**
* \brief Returns the estimated RTO. Pure virtual function.
* \return the estimated RTO.
*/
virtual time::steady_clock::duration
RetransmitTimeout() = 0;
/**
* \brief Increase the estimation multiplier up to MaxMultiplier.
*/
virtual void
IncreaseMultiplier();
/**
* \brief Resets the estimation multiplier to 1.
*/
virtual void
ResetMultiplier();
/**
* \brief Resets the estimation to its initial state.
*/
virtual void
Reset();
/**
* \brief Sets the Minimum RTO.
* \param minRto The minimum RTO returned by the estimator.
*/
void
SetMinRto(time::steady_clock::duration minRto);
/**
* \brief Get the Minimum RTO.
* \return The minimum RTO returned by the estimator.
*/
time::steady_clock::duration
GetMinRto(void) const;
/**
* \brief Sets the Maximum RTO.
* \param minRto The maximum RTO returned by the estimator.
*/
void
SetMaxRto(time::steady_clock::duration maxRto);
/**
* \brief Get the Maximum RTO.
* \return The maximum RTO returned by the estimator.
*/
time::steady_clock::duration
GetMaxRto(void) const;
/**
* \brief Sets the current RTT estimate (forcefully).
* \param estimate The current RTT estimate.
*/
void
SetCurrentEstimate(time::steady_clock::duration estimate);
/**
* \brief gets the current RTT estimate.
* \return The current RTT estimate.
*/
time::steady_clock::duration
GetCurrentEstimate(void) const;
double
duration_to_second_double(time::steady_clock::duration d) {
return d.count() / 1000000000.0;
}
time::steady_clock::duration
second_double_to_duration(double d) {
return time::steady_clock::duration(static_cast<long long>(d * 1000000000.0));
}
private:
uint32_t m_next; // Next expected sequence to be sent
uint16_t m_maxMultiplier;
time::steady_clock::duration m_initialEstimatedRtt;
protected:
time::steady_clock::duration m_currentEstimatedRtt; // Current estimate
time::steady_clock::duration m_minRto; // minimum value of the timeout
time::steady_clock::duration m_maxRto; // maximum value of the timeout
uint32_t m_nSamples; // Number of samples
uint16_t m_multiplier; // RTO Multiplier
RttHistory_t m_history; // List of sent packet
};
}
}
}
#endif /* RTT_ESTIMATOR_H */
+119
View File
@@ -0,0 +1,119 @@
#include "ndn-rtt-mean-deviation.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
RttMeanDeviation::RttMeanDeviation()
: m_gain(0.125),
m_gain2(0.25),
m_variance(0)
{
}
RttMeanDeviation::RttMeanDeviation(const RttMeanDeviation &c)
: RttEstimator(c), m_gain(c.m_gain), m_gain2(c.m_gain2), m_variance(c.m_variance)
{
}
void
RttMeanDeviation::Measurement(time::steady_clock::duration m)
{
if (m_nSamples)
{ // Not first
time::steady_clock::duration err(m - m_currentEstimatedRtt);
double gErr = duration_to_second_double(err) * m_gain;
m_currentEstimatedRtt += second_double_to_duration(gErr);
auto abs_err = err > time::steady_clock::duration::zero() ? err : -err;
time::steady_clock::duration difference = abs_err - m_variance;
m_variance += second_double_to_duration(m_gain2 * duration_to_second_double(difference));
}
else
{ // First sample
m_currentEstimatedRtt = m; // Set estimate to current
// variance = sample / 2; // And variance to current / 2
// m_variance = m; // try this why????
m_variance = second_double_to_duration(duration_to_second_double(m) / 2);
}
m_nSamples++;
}
time::steady_clock::duration
RttMeanDeviation::RetransmitTimeout()
{
// std::cout << "RTO -> m_currentEstimatedRtt:" << duration_to_second_double(m_currentEstimatedRtt) << ", m_variance:" << duration_to_second_double(m_variance) << std::endl;
double retval = std::min(
duration_to_second_double(m_maxRto),
std::max(
m_multiplier * duration_to_second_double(m_minRto),
m_multiplier * (duration_to_second_double(m_currentEstimatedRtt) + 4 * duration_to_second_double(m_variance))));
return second_double_to_duration(retval);
}
void
RttMeanDeviation::Reset()
{
// Reset to initial state
m_variance = time::steady_clock::duration::zero();
RttEstimator::Reset();
}
void
RttMeanDeviation::Gain(double g)
{
m_gain = g;
}
void
RttMeanDeviation::SentSeq(uint32_t seq, uint32_t size)
{
RttHistory_t::iterator i;
for (i = m_history.begin(); i != m_history.end(); ++i)
{
if (seq == i->seq)
{ // Found it
i->retx = true;
break;
}
}
// Note that a particular sequence has been sent
if (i == m_history.end())
m_history.push_back(RttHistory(seq, size, time::steady_clock::now()));
}
time::steady_clock::duration
RttMeanDeviation::AckSeq(uint32_t ackSeq)
{
// An ack has been received, calculate rtt and log this measurement
// Note we use a linear search (O(n)) for this since for the common
// case the ack'ed packet will be at the head of the list
auto m = time::steady_clock::duration::zero();
if (m_history.size() == 0)
return (m); // No pending history, just exit
for (RttHistory_t::iterator i = m_history.begin(); i != m_history.end(); ++i)
{
if (ackSeq == i->seq)
{ // Found it
if (!i->retx)
{
m = time::steady_clock::now() - i->time; // Elapsed time
Measurement(m); // Log the measurement
ResetMultiplier(); // Reset multiplier on valid measurement
}
m_history.erase(i);
break;
}
}
return m;
}
}
}
}
+57
View File
@@ -0,0 +1,57 @@
#ifndef NDN_RTT_MEAN_DEVIATION_H
#define NDN_RTT_MEAN_DEVIATION_H
#include "ndn-rtt-estimator.hpp"
#endif /* NDN_RTT_MEAN_DEVIATION_H */
namespace ndn
{
namespace cc
{
namespace client
{
/**
* \ingroup ndn-apps
*
* \brief The modified version of "Mean--Deviation" RTT estimator, as discussed by Van Jacobson that
*better suits NDN communication model
*
* This class implements the "Mean--Deviation" RTT estimator, as discussed
* by Van Jacobson and Michael J. Karels, in
* "Congestion Avoidance and Control", SIGCOMM 88, Appendix A
*
*/
class RttMeanDeviation : public RttEstimator
{
public:
RttMeanDeviation();
RttMeanDeviation(const RttMeanDeviation &);
void
SentSeq(uint32_t seq, uint32_t size);
time::steady_clock::duration
AckSeq(uint32_t ackSeq);
void
Measurement(time::steady_clock::duration measure);
time::steady_clock::duration
RetransmitTimeout();
void
Reset();
void
Gain(double g);
private:
double m_gain; // Filter gain
double m_gain2; // Filter gain
time::steady_clock::duration m_variance; // Current variance
};
}
}
}
+288
View File
@@ -0,0 +1,288 @@
#include "pcon-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
PconConsumer::PconConsumer(Face &face, const Options &options)
: WindowConsumer(face, options),
m_ccAlgorithm(options.ccAlgorithm),
m_beta(options.beta),
m_addRttSuppress(options.addRttSuppress),
m_reactToCongestionMarks(options.reactToCongestionMarks),
m_useCwa(options.useCwa),
m_ssthresh(std::numeric_limits<double>::max()),
m_highData(0),
m_recPoint(0.0),
m_useCubicFastConv(options.useCubicFastConv),
m_cubicBeta(options.cubicBeta),
m_cubicWmax(0),
m_cubicLastWmax(0),
m_cubicLastDecrease(time::steady_clock::now()),
m_bicMinWin(0),
m_bicMaxWin(std::numeric_limits<double>::max()),
m_bicTargetWin(0),
m_bicSsCwnd(0),
m_bicSsTarget(0),
m_isBicSs(false)
{
}
void PconConsumer::onTimeout(uint32_t seq)
{
WindowDecrease();
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight--;
}
Consumer::onTimeout(seq);
}
void PconConsumer::onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime)
{
Consumer::onData(data, seq, sendTime);
// Set highest received Data to sequence number
if (m_highData < seq)
{
m_highData = seq;
}
if (data.getCongestionMark() > 0)
{
std::cout << "Congestion Mark received: " << seq << std::endl;
if (m_reactToCongestionMarks)
{
WindowDecrease();
}
}
else
{
WindowIncrease();
}
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight--;
}
scheduleNextPacket();
}
void PconConsumer::WindowIncrease()
{
if (m_ccAlgorithm == CcAlgorithm::AIMD)
{
if (m_window < m_ssthresh)
{
m_window += 1.0;
}
else
{
m_window += (1.0 / m_window);
}
}
else if (m_ccAlgorithm == CcAlgorithm::CUBIC)
{
CubicIncrease();
}
else if (m_ccAlgorithm == CcAlgorithm::BIC)
{
BicIncrease();
}
else
{
BOOST_ASSERT_MSG(false, "Unknown CC Algorithm");
}
}
void PconConsumer::WindowDecrease()
{
if (!m_useCwa || m_highData > m_recPoint)
{
const double diff = m_nextSeq - m_highData;
BOOST_ASSERT(diff > 0);
m_recPoint = m_nextSeq + (m_addRttSuppress * diff);
if (m_ccAlgorithm == CcAlgorithm::AIMD)
{
// Normal TCP Decrease:
m_ssthresh = m_window * m_beta;
m_window = m_ssthresh;
}
else if (m_ccAlgorithm == CcAlgorithm::CUBIC)
{
CubicDecrease();
}
else if (m_ccAlgorithm == CcAlgorithm::BIC)
{
BicDecrease();
}
else
{
BOOST_ASSERT_MSG(false, "Unknown CC Algorithm");
}
// Window size cannot be reduced below initial size
if (m_window < m_initialWindowSize)
{
m_window = m_initialWindowSize;
}
}
}
void PconConsumer::BicIncrease()
{
if (m_window < BIC_LOW_WINDOW)
{
// Normal TCP AIMD behavior
if (m_window < m_ssthresh)
{
m_window = m_window + 1;
}
else
{
m_window = m_window + 1.0 / m_window;
}
}
else if (!m_isBicSs)
{
// Binary increase
if (m_bicTargetWin - m_window < BIC_MAX_INCREMENT)
{ // Binary search
m_window += (m_bicTargetWin - m_window) / m_window;
}
else
{
m_window += BIC_MAX_INCREMENT / m_window; // Additive increase
}
// FIX for equal double values.
if (m_window + 0.00001 < m_bicMaxWin)
{
m_bicMinWin = m_window;
m_bicTargetWin = (m_bicMaxWin + m_bicMinWin) / 2;
}
else
{
m_isBicSs = true;
m_bicSsCwnd = 1;
m_bicSsTarget = m_window + 1.0;
m_bicMaxWin = std::numeric_limits<double>::max();
}
}
else
{
// BIC slow start
m_window += m_bicSsCwnd / m_window;
if (m_window >= m_bicSsTarget)
{
m_bicSsCwnd = 2 * m_bicSsCwnd;
m_bicSsTarget = m_window + m_bicSsCwnd;
}
if (m_bicSsCwnd >= BIC_MAX_INCREMENT)
{
m_isBicSs = false;
}
}
}
void PconConsumer::BicDecrease()
{
// BIC Decrease
if (m_window >= BIC_LOW_WINDOW)
{
auto prev_max = m_bicMaxWin;
m_bicMaxWin = m_window;
m_window = m_window * m_cubicBeta;
m_bicMinWin = m_window;
if (prev_max > m_bicMaxWin)
{
// Fast Convergence
m_bicMaxWin = (m_bicMaxWin + m_bicMinWin) / 2;
}
m_bicTargetWin = (m_bicMaxWin + m_bicMinWin) / 2;
}
else
{
// Normal TCP Decrease:
m_ssthresh = m_window * m_cubicBeta;
m_window = m_ssthresh;
}
}
void PconConsumer::CubicIncrease()
{
// 1. Time since last congestion event in Seconds
const double t = time::duration_cast<time::microseconds>(
time::steady_clock::now() - m_cubicLastDecrease)
.count() /
1e6;
// 2. Time it takes to increase the window to cubic_wmax
// K = cubic_root(W_max*(1-beta_cubic)/C) (Eq. 2)
const double k = std::cbrt(m_cubicWmax * (1 - m_cubicBeta) / CUBIC_C);
// 3. Target: W_cubic(t) = C*(t-K)^3 + W_max (Eq. 1)
const double w_cubic = CUBIC_C * std::pow(t - k, 3) + m_cubicWmax;
// 4. Estimate of Reno Increase (Currently Disabled)
// const double rtt = m_rtt->GetCurrentEstimate().GetSeconds();
// const double w_est = m_cubic_wmax*m_beta + (3*(1-m_beta)/(1+m_beta)) * (t/rtt);
constexpr double w_est = 0.0;
// Actual adaptation
if (m_window < m_ssthresh)
{
m_window += 1.0;
}
else
{
BOOST_ASSERT(m_cubicWmax > 0);
double cubic_increment = std::max(w_cubic, w_est) - m_window;
// Cubic increment must be positive:
// Note: This change is not part of the RFC, but I added it to improve performance.
if (cubic_increment < 0)
{
cubic_increment = 0.0;
}
m_window += cubic_increment / m_window;
}
}
void PconConsumer::CubicDecrease()
{
// This implementation is ported from https://datatracker.ietf.org/doc/rfc8312/
const double FAST_CONV_DIFF = 1.0; // In percent
// A flow remembers the last value of W_max,
// before it updates W_max for the current congestion event.
// Current w_max < last_wmax
if (m_useCubicFastConv && m_window < m_cubicLastWmax * (1 - FAST_CONV_DIFF / 100))
{
m_cubicLastWmax = m_window;
m_cubicWmax = m_window * (1.0 + m_cubicBeta) / 2.0;
}
else
{
// Save old cwnd as w_max:
m_cubicLastWmax = m_window;
m_cubicWmax = m_window;
}
m_ssthresh = m_window * m_cubicBeta;
m_ssthresh = std::max<double>(m_ssthresh, m_initialWindowSize);
m_window = m_ssthresh;
m_cubicLastDecrease = time::steady_clock::now();
}
}
}
}
+74
View File
@@ -0,0 +1,74 @@
#include "window-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
class PconConsumer : public WindowConsumer
{
public:
explicit PconConsumer(Face &face, const Options &options);
virtual void onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void onTimeout(uint32_t seq);
private:
void
WindowIncrease();
void
WindowDecrease();
void
CubicIncrease();
void
CubicDecrease();
void
BicIncrease();
void
BicDecrease();
private:
CcAlgorithm m_ccAlgorithm;
double m_beta;
double m_addRttSuppress;
bool m_reactToCongestionMarks;
bool m_useCwa;
double m_ssthresh;
uint32_t m_highData;
double m_recPoint;
// TCP CUBIC Parameters //
static constexpr double CUBIC_C = 0.4;
bool m_useCubicFastConv;
double m_cubicBeta;
double m_cubicWmax;
double m_cubicLastWmax;
time::steady_clock::TimePoint m_cubicLastDecrease;
// TCP BIC Parameters //
//! Regular TCP behavior (including slow start) until this window size
static constexpr uint32_t BIC_LOW_WINDOW = 14;
//! Sets the maximum (linear) increase of TCP BIC. Should be between 8 and 64.
static constexpr uint32_t BIC_MAX_INCREMENT = 16;
// BIC variables:
double m_bicMinWin; //!< last minimum cwnd
double m_bicMaxWin; //!< last maximum cwnd
double m_bicTargetWin;
double m_bicSsCwnd;
double m_bicSsTarget;
bool m_isBicSs; //!< whether we are currently in the BIC slow start phase
};
}
}
}
+194
View File
@@ -0,0 +1,194 @@
#include "core/common.hpp"
#include "core/version.hpp"
#include "pcon-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
class Runner : noncopyable
{
public:
explicit Runner(const Options &options) : m_consumer(m_face, options)
{
m_consumer.afterFinish.connect([this]
{ this->cancel(); });
}
int
run()
{
try
{
m_consumer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
// m_tracer.onError(e.what());
return 2;
}
return 0;
}
private:
void
cancel()
{
m_consumer.stop();
}
private:
Face m_face;
PconConsumer m_consumer;
};
static void
usage(const boost::program_options::options_description &options)
{
std::cout << "Usage: ndnping [options] ndn:/name/prefix\n"
"\n"
"Ping a NDN name prefix using Interests with name ndn:/name/prefix/ping/number.\n"
"The numbers in the Interests are randomly generated unless specified.\n"
"\n"
<< options;
exit(2);
}
static int
main(int argc, char *argv[])
{
Options options;
// options.shouldAllowStaleData = false;
// options.nPings = -1;
// options.interval = time::milliseconds(getDefaultPingInterval());
// options.timeout = time::milliseconds(getDefaultPingTimeoutThreshold());
// options.startSeq = 0;
// options.shouldGenerateRandomSeq = true;
// options.shouldPrintTimestamp = false;
// std::string identifier;
namespace po = boost::program_options;
po::options_description visibleOptDesc("Options");
visibleOptDesc.add_options()("help,h", "print this message and exit")("version,V", "display version and exit");
visibleOptDesc.add_options()(
"startSeq", po::value<uint32_t>(&options.startSeq)->default_value(0), "start sequence number");
visibleOptDesc.add_options()(
"seqMax", po::value<int64_t>(&options.seqMax)->default_value(-1), "maximum sequence number");
visibleOptDesc.add_options()(
"dsz", po::value<uint32_t>(&options.dsz)->default_value(8624), "data size");
visibleOptDesc.add_options()(
"delayStart", po::value<uint32_t>(&options.delayStart)->default_value(0), "delay start time, in milliseconds");
visibleOptDesc.add_options()(
"fixedRate", po::value<int32_t>(&options.fixedRate)->default_value(-1), "fixed rate, in milliseconds");
visibleOptDesc.add_options()(
"timingStop", po::value<int32_t>(&options.timingStop)->default_value(-1), "timing stop, in milliseconds");
visibleOptDesc.add_options()(
"delayGreedy", po::value<int32_t>(&options.delayGreedy)->default_value(-1), "delay greedy, in milliseconds");
visibleOptDesc.add_options()(
"greedyRate", po::value<int32_t>(&options.greedyRate)->default_value(-1), "greedy rate, in milliseconds");
visibleOptDesc.add_options()(
"lifetime", po::value<time::milliseconds::rep>()->default_value(4000), "Interest lifetime, in milliseconds");
visibleOptDesc.add_options()(
"schema", po::value<std::string>(&options.schema)->default_value("QSCCP"), "Schema, QSCCP or PCON");
visibleOptDesc.add_options()(
"initialWindowSize", po::value<uint32_t>(&options.initialWindowSize)->default_value(1), "Initial Window Size");
visibleOptDesc.add_options()(
"setInitialWindowOnTimeout", po::value<bool>(&options.setInitialWindowOnTimeout)->default_value(false), "Set initial window size on timeout");
visibleOptDesc.add_options()(
"ccAlgorithm", po::value<std::string>()->default_value("BIC"), "Specify which window adaptation algorithm to use (AIMD, BIC, or CUBIC)");
visibleOptDesc.add_options()(
"beta", po::value<double>(&options.beta)->default_value(0.5), "TCP Multiplicative Decrease factor");
visibleOptDesc.add_options()(
"cubicBeta", po::value<double>(&options.cubicBeta)->default_value(0.8), "TCP CUBIC Multiplicative Decrease factor");
visibleOptDesc.add_options()(
"addRttSuppress", po::value<double>(&options.addRttSuppress)->default_value(0.5), "Minimum number of RTTs (1 + this factor) between window decreases");
visibleOptDesc.add_options()(
"reactToCongestionMarks", po::value<bool>(&options.reactToCongestionMarks)->default_value(false), "React to congestion marks (ECN, CE)");
visibleOptDesc.add_options()(
"useCwa", po::value<bool>(&options.useCwa)->default_value(false), "Use Congestion Window Acceleration (CWA) algorithm");
visibleOptDesc.add_options()(
"useCubicFastConv", po::value<bool>(&options.useCubicFastConv)->default_value(true), "Use CUBIC fast convergence algorithm");
po::options_description hiddenOptDesc;
hiddenOptDesc.add_options()("prefix", po::value<std::string>(), "content prefix to request");
po::options_description optDesc;
optDesc.add(visibleOptDesc).add(hiddenOptDesc);
po::positional_options_description optPos;
optPos.add("prefix", -1);
try
{
po::variables_map optVm;
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(optPos).run(), optVm);
po::notify(optVm);
if (optVm.count("help") > 0)
{
usage(visibleOptDesc);
}
if (optVm.count("version") > 0)
{
std::cout << "qsccp-client " << tools::VERSION << std::endl;
exit(0);
}
if (optVm.count("prefix") > 0)
{
options.prefix = Name(optVm["prefix"].as<std::string>());
}
else
{
std::cerr << "ERROR: No prefix specified" << std::endl;
usage(visibleOptDesc);
}
if (optVm.count("ccAlgorithm") > 0)
{
std::string algorithm_str = optVm["ccAlgorithm"].as<std::string>();
if (algorithm_str == "AIMD")
{
options.ccAlgorithm = CcAlgorithm::AIMD;
}
else if (algorithm_str == "BIC")
{
options.ccAlgorithm = CcAlgorithm::BIC;
}
else if (algorithm_str == "CUBIC")
{
options.ccAlgorithm = CcAlgorithm::CUBIC;
}
else
{
std::cerr << "ERROR: Not support CC Algorithm: " << algorithm_str << std::endl;
}
}
options.lifetime = time::milliseconds(optVm["lifetime"].as<time::milliseconds::rep>());
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
usage(visibleOptDesc);
}
std::cout << "PING " << options.prefix << std::endl;
return Runner(options).run();
}
}
}
}
int main(int argc, char *argv[])
{
return ndn::cc::client::main(argc, argv);
}
+115
View File
@@ -0,0 +1,115 @@
#include "window-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
WindowConsumer::WindowConsumer(Face &face, const Options &options)
: Consumer(face, options),
m_window(options.initialWindowSize),
m_inFlight(0),
m_initialWindowSize(options.initialWindowSize),
m_setInitialWindowOnTimeout(options.setInitialWindowOnTimeout),
fixedRate(options.fixedRate),
delayStart(options.delayStart),
timingStop(options.timingStop),
delayGreedy(options.delayGreedy),
greedyRate(options.greedyRate),
dsz(options.dsz)
{
}
void WindowConsumer::startGreedy()
{
this->fixedRate = this->greedyRate;
}
void WindowConsumer::scheduleNextPacket()
{
if (m_stopFlag)
{
return;
}
if (this->m_firstTime)
{
this->m_firstTime = false;
if (this->timingStop > 0)
{
m_scheduler.schedule(time::milliseconds(this->timingStop), [this]
{ stop(); });
}
if (this->delayGreedy > 0 && this->greedyRate > 0)
{
m_scheduler.schedule(time::milliseconds(this->delayGreedy), [this]
{ startGreedy(); });
}
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(this->delayStart), [this]
{ sendPacket(); });
return;
}
if (this->fixedRate > 0)
{
auto waitTime = (this->dsz * 1000000000) / this->fixedRate + 1;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ sendPacket(); });
return;
}
if (m_window == static_cast<uint32_t>(0))
{
m_nextInterestEvent.cancel();
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(1000), [this]
{ sendPacket(); });
}
else if (m_inFlight >= m_window)
{
// simply do nothing
}
else if (!m_nextInterestEvent)
{
// if (m_nextInterestEvent)
// {
// std::cout << "???" << std::endl;
// m_nextInterestEvent.cancel();
// }
// uint32_t waitTime = 1 / m_window * 1000000000;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(0), [this]
{ sendPacket(); });
}
}
void WindowConsumer::onTimeout(uint32_t seq)
{
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight -= 1;
}
if (m_setInitialWindowOnTimeout)
{
m_window = m_initialWindowSize;
}
Consumer::onTimeout(seq);
}
void WindowConsumer::onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime)
{
Consumer::onData(data, seq, sendTime);
m_window++;
if (m_inFlight > static_cast<uint32_t>(0))
{
m_inFlight--;
}
scheduleNextPacket();
}
void WindowConsumer::willSendInterest(uint32_t seq)
{
m_inFlight++;
Consumer::willSendInterest(seq);
}
}
}
}
+41
View File
@@ -0,0 +1,41 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
class WindowConsumer : public Consumer
{
public:
explicit WindowConsumer(Face &face, const Options &options);
virtual void onData(const Data &data, uint32_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void onTimeout(uint32_t seq);
virtual void willSendInterest(uint32_t seq);
protected:
virtual void scheduleNextPacket();
void startGreedy();
protected:
double m_window;
uint32_t m_inFlight;
uint32_t m_initialWindowSize;
bool m_setInitialWindowOnTimeout;
int32_t fixedRate;
uint64_t delayStart;
int32_t timingStop;
int32_t delayGreedy;
int32_t greedyRate;
uint64_t dsz;
};
}
}
}
+157
View File
@@ -0,0 +1,157 @@
#include "ndn-producer.hpp"
#include "core/common.hpp"
#include "core/version.hpp"
namespace ndn
{
namespace cc
{
namespace server
{
namespace po = boost::program_options;
class Runner : noncopyable
{
public:
explicit Runner(const Options &options)
: m_options(options),
m_producer(m_face, m_keyChain, options)
{
m_producer.afterFinish.connect([this]
{
m_producer.stop();
});
}
int
run()
{
std::cout << "Producer SERVER " << m_options.prefix << std::endl;
try
{
// Interest interest(Name("/A/1"));
// interest.setCanBePrefix(false);
// interest.setMustBeFresh(true);
// interest.setServiceClass(0);
// m_producer.onInterest(interest);
m_producer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
return 1;
}
return 0;
}
private:
const Options &m_options;
Face m_face;
KeyChain m_keyChain;
Producer m_producer;
};
static void
usage(std::ostream &os, const std::string &programName, const po::options_description &options)
{
os << "Usage: " << programName << " [options] <prefix>\n"
<< "\n"
<< "Starts a NDN ping server that responds to Interests under name ndn:<prefix>/ping\n"
<< "\n"
<< options;
}
static int
main(int argc, char *argv[])
{
Options options;
std::string prefix;
po::options_description visibleDesc("Options");
visibleDesc.add_options()("help,h", "print this help message and exit");
visibleDesc.add_options()("freshness,f",
po::value<time::milliseconds::rep>()->default_value(4000),
"FreshnessPeriod of the ping response, in milliseconds");
visibleDesc.add_options()("size,s",
po::value<uint32_t>()->default_value(1024),
"size of response payload");
visibleDesc.add_options()("version,V", "print program version and exit");
po::options_description hiddenDesc;
hiddenDesc.add_options()("prefix", po::value<std::string>(&prefix));
po::options_description optDesc;
optDesc.add(visibleDesc).add(hiddenDesc);
po::positional_options_description posDesc;
posDesc.add("prefix", -1);
po::variables_map vm;
try
{
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(posDesc).run(), vm);
po::notify(vm);
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << "\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
catch (const boost::bad_any_cast &e)
{
std::cerr << "ERROR: " << e.what() << "\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
if (vm.count("help") > 0)
{
usage(std::cout, argv[0], visibleDesc);
return 0;
}
if (vm.count("version") > 0)
{
std::cout << "ndnpingserver " << tools::VERSION << std::endl;
return 0;
}
if (prefix.empty())
{
std::cerr << "ERROR: no name prefix specified\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
options.prefix = Name(prefix);
options.freshnessPeriod = time::milliseconds(vm["freshness"].as<time::milliseconds::rep>());
if (vm.count("_deprecated_freshness_") > 0)
{
options.freshnessPeriod = time::milliseconds(vm["_deprecated_freshness_"].as<time::milliseconds::rep>());
}
if (options.freshnessPeriod < 0_ms)
{
std::cerr << "ERROR: FreshnessPeriod cannot be negative" << std::endl;
return 2;
}
options.payloadSize = vm["size"].as<uint32_t>();
return Runner(options).run();
}
} // namespace server
} // namespace ping
} // namespace ndn
int main(int argc, char *argv[])
{
return ndn::cc::server::main(argc, argv);
}
+85
View File
@@ -0,0 +1,85 @@
#include "ndn-producer.hpp"
#include <ndn-cxx/signature.hpp>
#include <iostream>
namespace ndn
{
namespace cc
{
namespace server
{
Producer::Producer(Face &face, KeyChain &keyChain, const Options &options)
: m_face(face),
m_keyChain(keyChain),
m_options(options),
m_isRunning(false),
m_signature(0)
{
auto b = make_shared<Buffer>();
b->assign(m_options.payloadSize, 'a');
m_payload = Block(tlv::Content, std::move(b));
}
void Producer::run()
{
m_face.processEvents();
}
void Producer::start()
{
if (m_isRunning)
{
return;
}
m_isRunning = true;
m_face.setInterestFilter(m_options.prefix,
bind(&Producer::onInterest, this, _2),
[](const auto &, const auto &reason)
{
NDN_THROW(std::runtime_error("Failed to register prefix: " + reason));
});
}
void Producer::stop()
{
if (!m_isRunning)
{
return;
}
m_isRunning = false;
m_face.setInterestFilter(m_options.prefix, nullptr);
}
void Producer::onInterest(const Interest &interest)
{
afterReceive(interest.getName());
Data data(interest.getName());
if (interest.getServiceClass())
{
int serviceClass = *interest.getServiceClass();
data.setServiceClass(serviceClass);
}
// 设置 targetRate 为最大32位整数
data.setTargetRate(std::numeric_limits<uint32_t>::max());
data.setFreshnessPeriod(m_options.freshnessPeriod);
data.setContent(m_payload);
Signature signature;
SignatureInfo signatureInfo(static_cast<::ndn::tlv::SignatureTypeValue>(255));
signature.setInfo(signatureInfo);
signature.setValue(::ndn::makeNonNegativeIntegerBlock(::ndn::tlv::SignatureValue, m_signature));
data.setSignature(signature);
data.wireEncode();
// m_keyChain.sign(data);
m_face.put(data);
std::cout << "onInterest: " << interest.getName() << std::endl;
}
}
}
}
+73
View File
@@ -0,0 +1,73 @@
#include "core/common.hpp"
namespace ndn
{
namespace cc
{
namespace server
{
/**
* @brief Options for NDN producer
*/
struct Options
{
Name prefix; //!< prefix to register
time::milliseconds freshnessPeriod = 4_s; //!< data freshness period
uint32_t payloadSize = 0; //!< response payload size (0 == no payload)
};
class Producer : noncopyable
{
public:
Producer(Face &face, KeyChain &keyChain, const Options &options);
/**
* @brief Signals when Interest received
*
* @param name incoming interest name
*/
signal::Signal<Producer, Name> afterReceive;
/**
* @brief Signals when finished pinging
*/
signal::Signal<Producer> afterFinish;
void
run();
/**
* @brief starts the Interest filter
*
* @note This method is non-blocking and caller need to call face.processEvents()
*/
void
start();
/**
* @brief Unregister set interest filter
*/
void
stop();
public:
/**
* @brief Called when interest received
*
* @param interest incoming interest
*/
void
onInterest(const Interest &interest);
private:
const Options &m_options;
Face &m_face;
KeyChain &m_keyChain;
Block m_payload;
bool m_isRunning;
uint32_t m_signature;
RegisteredPrefixHandle m_registeredPrefix;
};
}
}
}
+53
View File
@@ -0,0 +1,53 @@
# -*- Mode: python; py-indent-offset: 4; indent-tabs-mode: nil; coding: utf-8; -*-
top = '../..'
def build(bld):
bld.objects(
target='qsccp-client-objects',
source=bld.path.ant_glob('client/*.cpp', excl='client/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/qsccp-client',
name='qsccp-client',
source='client/main.cpp',
use='qsccp-client-objects')
bld.objects(
target='pcon-client-objects',
source=bld.path.ant_glob('pcon/*.cpp', excl='pcon/pcon.cpp'),
use='core-objects')
bld.program(
target='../../bin/pcon-client',
name='pcon-client',
source='pcon/pcon.cpp',
use='pcon-client-objects')
bld.objects(
target='bbr-client-objects',
source=bld.path.ant_glob('bbr/*.cpp', excl='bbr/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/bbr-client',
name='bbr-client',
source='bbr/main.cpp',
use='bbr-client-objects')
bld.objects(
target='cc-server-objects',
source=bld.path.ant_glob('server/*.cpp', excl='server/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/cc-producer',
name='cc-producer',
source='server/main.cpp',
use='cc-server-objects')
## (for unit tests)
bld(target='cc-objects',
use='qsccp-client-objects cc-server-objects')
+42
View File
@@ -0,0 +1,42 @@
# -*- Mode: python; py-indent-offset: 4; indent-tabs-mode: nil; coding: utf-8; -*-
top = '../..'
def build(bld):
bld.objects(
target='qsccp-client-objects',
source=bld.path.ant_glob('client/*.cpp', excl='client/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/qsccp-client',
name='qsccp-client',
source='client/main.cpp',
use='qsccp-client-objects')
bld.objects(
target='pcon-client-objects',
source=bld.path.ant_glob('pcon/*.cpp', excl='pcon/pcon.cpp'),
use='core-objects')
bld.program(
target='../../bin/pcon-client',
name='pcon-client',
source='pcon/pcon.cpp',
use='pcon-client-objects')
bld.objects(
target='cc-server-objects',
source=bld.path.ant_glob('server/*.cpp', excl='server/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/cc-producer',
name='cc-producer',
source='server/main.cpp',
use='cc-server-objects')
## (for unit tests)
bld(target='cc-objects',
use='qsccp-client-objects cc-server-objects')