3 Commits

Author SHA1 Message Date
root 9648808618 fix(cc): delay need time 2023-12-18 20:57:24 +08:00
root 1675c0aca0 feat(cc): Update 2023-12-18 16:39:41 +08:00
SunnyQjm a23e941b13 feat(cc): Add cc-producer and cc-client 2023-12-15 14:57:02 +00:00
10 changed files with 935 additions and 0 deletions
+1
View File
@@ -68,6 +68,7 @@
#include <ndn-cxx/util/scheduler.hpp>
#include <ndn-cxx/util/signal.hpp>
#include <ndn-cxx/util/time.hpp>
#include <iostream>
namespace ndn {
+157
View File
@@ -0,0 +1,157 @@
#include "core/common.hpp"
#include "core/version.hpp"
#include "qsccp-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
class Runner : noncopyable
{
public:
explicit Runner(const Options &options) : m_consumer(m_face, options)
{
m_consumer.afterFinish.connect([this]
{ this->cancel(); });
}
int
run()
{
try
{
m_consumer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
// m_tracer.onError(e.what());
return 2;
}
return 0;
}
private:
void
cancel()
{
m_consumer.stop();
}
private:
Face m_face;
QsccpConsumer m_consumer;
};
static void
usage(const boost::program_options::options_description &options)
{
std::cout << "Usage: ndnping [options] ndn:/name/prefix\n"
"\n"
"Ping a NDN name prefix using Interests with name ndn:/name/prefix/ping/number.\n"
"The numbers in the Interests are randomly generated unless specified.\n"
"\n"
<< options;
exit(2);
}
static int
main(int argc, char *argv[])
{
Options options;
// options.shouldAllowStaleData = false;
// options.nPings = -1;
// options.interval = time::milliseconds(getDefaultPingInterval());
// options.timeout = time::milliseconds(getDefaultPingTimeoutThreshold());
// options.startSeq = 0;
// options.shouldGenerateRandomSeq = true;
// options.shouldPrintTimestamp = false;
// std::string identifier;
namespace po = boost::program_options;
po::options_description visibleOptDesc("Options");
visibleOptDesc.add_options()("help,h", "print this message and exit")("version,V", "display version and exit");
visibleOptDesc.add_options()(
"startSeq", po::value<uint64_t>(&options.startSeq)->default_value(0), "start sequence number");
visibleOptDesc.add_options()(
"seqMax", po::value<int64_t>(&options.seqMax)->default_value(-1), "maximum sequence number");
visibleOptDesc.add_options()(
"tos", po::value<uint32_t>(&options.tos)->default_value(5), "set the TOS field");
visibleOptDesc.add_options()(
"dsz", po::value<uint32_t>(&options.dsz)->default_value(8624), "data size");
visibleOptDesc.add_options()(
"delayStart", po::value<uint32_t>(&options.delayStart)->default_value(0), "delay start time, in milliseconds");
visibleOptDesc.add_options()(
"initialSendRate", po::value<uint32_t>(&options.initialSendRate)->default_value(0), "initial send rate, in milliseconds");
visibleOptDesc.add_options()(
"fixedRate", po::value<int32_t>(&options.fixedRate)->default_value(-1), "fixed rate, in milliseconds");
visibleOptDesc.add_options()(
"timingStop", po::value<int32_t>(&options.timingStop)->default_value(-1), "timing stop, in milliseconds");
visibleOptDesc.add_options()(
"delayGreedy", po::value<int32_t>(&options.delayGreedy)->default_value(-1), "delay greedy, in milliseconds");
visibleOptDesc.add_options()(
"greedyRate", po::value<int32_t>(&options.greedyRate)->default_value(-1), "greedy rate, in milliseconds");
visibleOptDesc.add_options()(
"lifetime", po::value<time::milliseconds::rep>()->default_value(4000), "Interest lifetime, in milliseconds");
po::options_description hiddenOptDesc;
hiddenOptDesc.add_options()("prefix", po::value<std::string>(), "content prefix to request");
po::options_description optDesc;
optDesc.add(visibleOptDesc).add(hiddenOptDesc);
po::positional_options_description optPos;
optPos.add("prefix", -1);
try
{
po::variables_map optVm;
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(optPos).run(), optVm);
po::notify(optVm);
if (optVm.count("help") > 0)
{
usage(visibleOptDesc);
}
if (optVm.count("version") > 0)
{
std::cout << "qsccp-client " << tools::VERSION << std::endl;
exit(0);
}
if (optVm.count("prefix") > 0)
{
options.prefix = Name(optVm["prefix"].as<std::string>());
}
else
{
std::cerr << "ERROR: No prefix specified" << std::endl;
usage(visibleOptDesc);
}
options.lifetime = time::milliseconds(optVm["lifetime"].as<time::milliseconds::rep>());
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
usage(visibleOptDesc);
}
std::cout << "PING " << options.prefix << std::endl;
return Runner(options).run();
}
}
}
}
int main(int argc, char *argv[])
{
return ndn::cc::client::main(argc, argv);
}
+140
View File
@@ -0,0 +1,140 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
Consumer::Consumer(Face &face, const Options &options)
: m_options(options),
m_nSent(0),
m_nextSeq(options.startSeq),
m_seqMax(options.seqMax),
m_nOutstanding(0),
m_face(face),
m_scheduler(m_face.getIoService()),
m_stopFlag(false),
m_recvBytes(0),
m_traceTimes(0)
{
if (m_options.seqMax < 0)
{
m_seqMax = std::numeric_limits<uint64_t>::max();
}
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
void Consumer::traceRate()
{
if (!m_stopFlag)
{
m_scheduler.schedule(
time::milliseconds(500),
bind(&Consumer::traceRate, this));
}
m_traceTimes++;
// print rate
std::cout << "cc:rate:<" << m_traceTimes << "," << m_recvBytes << ">" << m_nSent << std::endl;
m_recvBytes = 0;
m_nSent = 0;
}
void Consumer::start()
{
m_stopFlag = false;
scheduleNextPacket();
}
void Consumer::stop()
{
m_nextInterestEvent.cancel();
m_stopFlag = true;
}
void Consumer::sendPacket()
{
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, now),
bind(&Consumer::onNack, this, _2, seq, now),
bind(&Consumer::onTimeout, this, seq));
++m_nSent;
++m_nOutstanding;
scheduleNextPacket();
// if (m_nSent < m_seqMax)
// {
// m_nextInterestEvent = m_scheduler.schedule(m_options.interval, [this]
// { scheduleNextInterest(); });
// }
// else
// {
// finish();
// }
}
void Consumer::onTimeout(uint64_t seq)
{
m_retxSeqs.insert(seq);
afterTimeout(seq);
scheduleNextPacket();
// finish();
}
void Consumer::onData(const Data &data, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
auto now = time::steady_clock::now();
std::cout << "cc:delay:<" << (now - m_startTime).count() << "," << seq << "," << (now - sendTime).count() << ">" << std::endl;
afterData(seq, now - sendTime);
m_recvBytes += data.wireEncode().size();
// finish();
}
void Consumer::onNack(const lp::Nack &nack, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
std::cout << "onNack: " << nack.getReason() << std::endl;
afterNack(seq, time::steady_clock::now() - sendTime, nack.getHeader());
scheduleNextPacket();
// finish();
}
Name Consumer::makeInterestName(uint64_t seq)
{
return Name(m_options.prefix).appendNumber(seq);
}
}
}
}
+90
View File
@@ -0,0 +1,90 @@
#include "core/common.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
typedef time::duration<double, time::milliseconds::period> Rtt;
class Options
{
public:
Name prefix;
uint64_t startSeq; // Initial sequence number
time::milliseconds lifetime; // Lifetime of Interest
int64_t seqMax; // Max sequence number
uint32_t tos; // Type of Service
uint32_t dsz; // Data size
int32_t reqNum; // Request Data Num(-1 indicator infinity)
uint32_t initialSendRate; // Initial Send Rate
int32_t fixedRate; // Fixed Send Rate
int32_t timingStop; // Timing Stop(-1 indicator infinity)
int32_t delayGreedy; // Start greedy after the specified time (ms)
int32_t greedyRate; // Greedy Send Rate(-1 indicator no greedy)
uint32_t delayStart; // Delay Start(ms)
};
class Consumer : noncopyable
{
public:
Consumer(Face &face, const Options &options);
virtual ~Consumer(){}
signal::Signal<Consumer, uint64_t, Rtt> afterData;
signal::Signal<Consumer, uint64_t, Rtt, lp::NackHeader> afterNack;
signal::Signal<Consumer, uint64_t> afterTimeout;
signal::Signal<Consumer> afterFinish;
void start();
void stop();
public:
Name makeInterestName(uint64_t seq);
virtual void sendPacket();
virtual void onTimeout(uint64_t seq);
virtual void onData(const Data& data, uint64_t seq, const time::steady_clock::TimePoint &sendTime);
virtual void onNack(const lp::Nack &nack, uint64_t seq, const time::steady_clock::TimePoint &sendTime);
protected:
virtual void scheduleNextPacket() = 0;
private:
void traceRate();
protected:
const Options &m_options;
int m_nSent;
uint64_t m_nextSeq;
int64_t m_seqMax;
int m_nOutstanding;
Face &m_face;
Scheduler m_scheduler;
int m_stopFlag;
uint64_t m_recvBytes;
uint64_t m_traceTimes;
time::steady_clock::TimePoint m_startTime;
/// @cond include_hidden
/**
* \struct This struct contains sequence numbers of packets to be retransmitted
*/
struct RetxSeqsContainer : public std::set<uint32_t>
{
};
RetxSeqsContainer m_retxSeqs; ///< \brief ordered set of sequence numbers to be retransmitted
scheduler::EventId m_nextInterestEvent;
};
}
}
}
+160
View File
@@ -0,0 +1,160 @@
#include "qsccp-consumer.hpp"
#include <iostream>
namespace ndn
{
namespace cc
{
namespace client
{
QsccpConsumer::QsccpConsumer(Face &face, const Options &options)
: Consumer(face, options),
m_firstTime(true),
tos(options.tos),
delayStart(options.delayStart),
dsz(options.dsz),
sendRate(options.initialSendRate),
fixedRate(options.fixedRate),
timingStop(options.timingStop),
delayGreedy(options.delayGreedy),
greedyRate(options.greedyRate),
m_recvDataNum(0)
{
}
uint64_t QsccpConsumer::updateRate(uint64_t newRate)
{
if (this->fixedRate > 0)
{
return this->fixedRate;
}
if (this->sendRate == 0)
{
this->sendRate = newRate;
}
else
{
this->sendRate = 0.8 * this->sendRate + 0.2 * newRate;
}
return this->sendRate;
}
void QsccpConsumer::startGreedy()
{
this->fixedRate = this->greedyRate;
this->sendRate = this->greedyRate;
}
void QsccpConsumer::onData(const Data &data, uint64_t seq, const time::steady_clock::TimePoint &sendTime)
{
// print tags
auto targetRate = data.getTargetRate();
// std::cout << "onData: " << seq << ", targetRate: " << *targetRate << std::endl;
if (targetRate)
{
this->updateRate(*targetRate);
}
else
{
// NS_LOG_DEBUG("no target rate");
}
scheduleNextPacket();
Consumer::onData(data, seq, sendTime);
++this->m_recvDataNum;
if (this->m_seqMax == this->m_recvDataNum)
{
this->stop();
return;
}
}
void QsccpConsumer::scheduleNextPacket()
{
if (m_stopFlag)
{
return;
}
if (this->m_firstTime)
{
m_startTime = time::steady_clock::now();
this->m_firstTime = false;
if (this->fixedRate > 0)
{
this->sendRate = fixedRate;
}
if (this->timingStop > 0)
{
m_scheduler.schedule(time::milliseconds(this->timingStop), [this]
{
stop(); });
}
if (this->delayGreedy > 0 && this->greedyRate > 0)
{
m_scheduler.schedule(time::milliseconds(this->delayGreedy), [this]
{ startGreedy(); });
}
m_nextInterestEvent = m_scheduler.schedule(time::milliseconds(this->delayStart), [this]
{ sendPacket(); });
}
else if (!m_nextInterestEvent)
{
if (this->sendRate <= 0)
{
return;
}
auto waitTime = (this->dsz * 1000000000) / this->sendRate + 1;
m_nextInterestEvent = m_scheduler.schedule(time::nanoseconds(waitTime), [this]
{ sendPacket(); });
}
}
void QsccpConsumer::sendPacket()
{
scheduleNextPacket();
// send packet here
uint32_t seq = std::numeric_limits<uint32_t>::max(); // invalid
while (m_retxSeqs.size())
{
seq = *m_retxSeqs.begin();
m_retxSeqs.erase(m_retxSeqs.begin());
break;
}
if (seq == std::numeric_limits<uint32_t>::max())
{
if (m_seqMax != std::numeric_limits<uint32_t>::max())
{
if (m_nextSeq >= m_seqMax)
{
stop();
// finish();
return; // we are totally done
}
}
seq = m_nextSeq++;
}
Interest interest(makeInterestName(seq));
interest.setCanBePrefix(false);
interest.setMustBeFresh(true);
interest.setServiceClass(this->tos);
interest.setDsz(this->dsz);
interest.setInterestLifetime(m_options.lifetime);
auto now = time::steady_clock::now();
m_face.expressInterest(interest,
bind(&Consumer::onData, this, _2, seq, now),
bind(&Consumer::onNack, this, _2, seq, now),
bind(&Consumer::onTimeout, this, seq));
++m_nSent;
++m_nOutstanding;
}
}
}
}
+41
View File
@@ -0,0 +1,41 @@
#include "ndn-consumer.hpp"
namespace ndn
{
namespace cc
{
namespace client
{
class QsccpConsumer : public Consumer
{
public:
explicit QsccpConsumer(Face &face, const Options &options);
void sendPacket() override;
void onData(const Data& data, uint64_t seq, const time::steady_clock::TimePoint &sendTime) override;
protected:
void scheduleNextPacket() override;
void startGreedy();
private:
uint64_t updateRate(uint64_t newRate);
private:
bool m_firstTime = true;
// private attribute here
uint64_t tos;
uint64_t delayStart;
uint64_t dsz;
uint64_t sendRate;
int32_t fixedRate;
int32_t timingStop;
int32_t delayGreedy;
int32_t greedyRate;
uint64_t m_recvDataNum;
};
}
}
}
+157
View File
@@ -0,0 +1,157 @@
#include "ndn-producer.hpp"
#include "core/common.hpp"
#include "core/version.hpp"
namespace ndn
{
namespace cc
{
namespace server
{
namespace po = boost::program_options;
class Runner : noncopyable
{
public:
explicit Runner(const Options &options)
: m_options(options),
m_producer(m_face, m_keyChain, options)
{
m_producer.afterFinish.connect([this]
{
m_producer.stop();
});
}
int
run()
{
std::cout << "Producer SERVER " << m_options.prefix << std::endl;
try
{
// Interest interest(Name("/A/1"));
// interest.setCanBePrefix(false);
// interest.setMustBeFresh(true);
// interest.setServiceClass(0);
// m_producer.onInterest(interest);
m_producer.start();
m_face.processEvents();
}
catch (const std::exception &e)
{
std::cerr << "ERROR: " << e.what() << std::endl;
return 1;
}
return 0;
}
private:
const Options &m_options;
Face m_face;
KeyChain m_keyChain;
Producer m_producer;
};
static void
usage(std::ostream &os, const std::string &programName, const po::options_description &options)
{
os << "Usage: " << programName << " [options] <prefix>\n"
<< "\n"
<< "Starts a NDN ping server that responds to Interests under name ndn:<prefix>/ping\n"
<< "\n"
<< options;
}
static int
main(int argc, char *argv[])
{
Options options;
std::string prefix;
po::options_description visibleDesc("Options");
visibleDesc.add_options()("help,h", "print this help message and exit");
visibleDesc.add_options()("freshness,f",
po::value<time::milliseconds::rep>()->default_value(4000),
"FreshnessPeriod of the ping response, in milliseconds");
visibleDesc.add_options()("size,s",
po::value<uint32_t>()->default_value(1024),
"size of response payload");
visibleDesc.add_options()("version,V", "print program version and exit");
po::options_description hiddenDesc;
hiddenDesc.add_options()("prefix", po::value<std::string>(&prefix));
po::options_description optDesc;
optDesc.add(visibleDesc).add(hiddenDesc);
po::positional_options_description posDesc;
posDesc.add("prefix", -1);
po::variables_map vm;
try
{
po::store(po::command_line_parser(argc, argv).options(optDesc).positional(posDesc).run(), vm);
po::notify(vm);
}
catch (const po::error &e)
{
std::cerr << "ERROR: " << e.what() << "\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
catch (const boost::bad_any_cast &e)
{
std::cerr << "ERROR: " << e.what() << "\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
if (vm.count("help") > 0)
{
usage(std::cout, argv[0], visibleDesc);
return 0;
}
if (vm.count("version") > 0)
{
std::cout << "ndnpingserver " << tools::VERSION << std::endl;
return 0;
}
if (prefix.empty())
{
std::cerr << "ERROR: no name prefix specified\n\n";
usage(std::cerr, argv[0], visibleDesc);
return 2;
}
options.prefix = Name(prefix);
options.freshnessPeriod = time::milliseconds(vm["freshness"].as<time::milliseconds::rep>());
if (vm.count("_deprecated_freshness_") > 0)
{
options.freshnessPeriod = time::milliseconds(vm["_deprecated_freshness_"].as<time::milliseconds::rep>());
}
if (options.freshnessPeriod < 0_ms)
{
std::cerr << "ERROR: FreshnessPeriod cannot be negative" << std::endl;
return 2;
}
options.payloadSize = vm["size"].as<uint32_t>();
return Runner(options).run();
}
} // namespace server
} // namespace ping
} // namespace ndn
int main(int argc, char *argv[])
{
return ndn::cc::server::main(argc, argv);
}
+85
View File
@@ -0,0 +1,85 @@
#include "ndn-producer.hpp"
#include <ndn-cxx/signature.hpp>
#include <iostream>
namespace ndn
{
namespace cc
{
namespace server
{
Producer::Producer(Face &face, KeyChain &keyChain, const Options &options)
: m_face(face),
m_keyChain(keyChain),
m_options(options),
m_isRunning(false),
m_signature(0)
{
auto b = make_shared<Buffer>();
b->assign(m_options.payloadSize, 'a');
m_payload = Block(tlv::Content, std::move(b));
}
void Producer::run()
{
m_face.processEvents();
}
void Producer::start()
{
if (m_isRunning)
{
return;
}
m_isRunning = true;
m_face.setInterestFilter(m_options.prefix,
bind(&Producer::onInterest, this, _2),
[](const auto &, const auto &reason)
{
NDN_THROW(std::runtime_error("Failed to register prefix: " + reason));
});
}
void Producer::stop()
{
if (!m_isRunning)
{
return;
}
m_isRunning = false;
m_face.setInterestFilter(m_options.prefix, nullptr);
}
void Producer::onInterest(const Interest &interest)
{
afterReceive(interest.getName());
Data data(interest.getName());
if (interest.getServiceClass())
{
int serviceClass = *interest.getServiceClass();
data.setServiceClass(serviceClass);
}
// 设置 targetRate 为最大32位整数
data.setTargetRate(std::numeric_limits<uint32_t>::max());
data.setFreshnessPeriod(m_options.freshnessPeriod);
data.setContent(m_payload);
Signature signature;
SignatureInfo signatureInfo(static_cast<::ndn::tlv::SignatureTypeValue>(255));
signature.setInfo(signatureInfo);
signature.setValue(::ndn::makeNonNegativeIntegerBlock(::ndn::tlv::SignatureValue, m_signature));
data.setSignature(signature);
data.wireEncode();
// m_keyChain.sign(data);
m_face.put(data);
std::cout << "onInterest: " << interest.getName() << std::endl;
}
}
}
}
+73
View File
@@ -0,0 +1,73 @@
#include "core/common.hpp"
namespace ndn
{
namespace cc
{
namespace server
{
/**
* @brief Options for NDN producer
*/
struct Options
{
Name prefix; //!< prefix to register
time::milliseconds freshnessPeriod = 4_s; //!< data freshness period
uint32_t payloadSize = 0; //!< response payload size (0 == no payload)
};
class Producer : noncopyable
{
public:
Producer(Face &face, KeyChain &keyChain, const Options &options);
/**
* @brief Signals when Interest received
*
* @param name incoming interest name
*/
signal::Signal<Producer, Name> afterReceive;
/**
* @brief Signals when finished pinging
*/
signal::Signal<Producer> afterFinish;
void
run();
/**
* @brief starts the Interest filter
*
* @note This method is non-blocking and caller need to call face.processEvents()
*/
void
start();
/**
* @brief Unregister set interest filter
*/
void
stop();
public:
/**
* @brief Called when interest received
*
* @param interest incoming interest
*/
void
onInterest(const Interest &interest);
private:
const Options &m_options;
Face &m_face;
KeyChain &m_keyChain;
Block m_payload;
bool m_isRunning;
uint32_t m_signature;
RegisteredPrefixHandle m_registeredPrefix;
};
}
}
}
+31
View File
@@ -0,0 +1,31 @@
# -*- Mode: python; py-indent-offset: 4; indent-tabs-mode: nil; coding: utf-8; -*-
top = '../..'
def build(bld):
bld.objects(
target='qsccp-client-objects',
source=bld.path.ant_glob('client/*.cpp', excl='client/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/qsccp-client',
name='qsccp-client',
source='client/main.cpp',
use='qsccp-client-objects')
bld.objects(
target='cc-server-objects',
source=bld.path.ant_glob('server/*.cpp', excl='server/main.cpp'),
use='core-objects')
bld.program(
target='../../bin/cc-producer',
name='cc-producer',
source='server/main.cpp',
use='cc-server-objects')
## (for unit tests)
bld(target='cc-objects',
use='qsccp-client-objects cc-server-objects')