feat: Add qsccp sched

This commit is contained in:
2023-12-03 17:39:53 +08:00
parent 9b877c3a23
commit 9bc57b880b
6 changed files with 990 additions and 5 deletions
+42
View File
@@ -0,0 +1,42 @@
#include "TokenBucket.hpp"
#include "common/global.hpp"
#include "common/logger.hpp"
#include <functional>
namespace nfd {
namespace face {
NFD_LOG_INIT(QSCCPTokenBucket);
TokenBucket::TokenBucket(uint64_t rateLimit, uint64_t bucketSize, const std::string& name): rateLimit(rateLimit), bucketSize(bucketSize), name(name) {
this->eachStep = this->rateLimit / 250.0;
this->currentTokenSize = 0;
this->isStart = false;
// NFD_LOG_DEBUG("Create tokenBucket: " << name << ", eachStep: " << this->eachStep << ", rateLimit: " << this->rateLimit);
}
void TokenBucket::start() {
if (this->isStart) {
return;
}
// 每 4ms 增加一次令牌
getScheduler().schedule(time::milliseconds(4), std::bind(&TokenBucket::generateToken, this));
}
void TokenBucket::generateToken() {
// NFD_LOG_DEBUG(this->name << "=> do generateToken => " << this->currentTokenSize);
if (this->currentTokenSize + this->eachStep > this->bucketSize) {
this->currentTokenSize = this->bucketSize;
} else {
this->currentTokenSize += this->eachStep;
}
getScheduler().schedule(time::milliseconds(4), std::bind(&TokenBucket::generateToken, this));
}
bool TokenBucket::use(uint64_t tokenNum) {
if(this->currentTokenSize > tokenNum) {
this->currentTokenSize -= tokenNum;
// NFD_LOG_DEBUG(this->name << "=> use success => " << this->currentTokenSize);
return true;
} else {
// NFD_LOG_DEBUG(this->name << "=> use failed => " << this->currentTokenSize);
return false;
}
}
}
}
+34
View File
@@ -0,0 +1,34 @@
//
// Created by gdcni21 on 3/25/22.
//
#ifndef NS_TOKENBUCKET_H
#define NS_TOKENBUCKET_H
#include <memory>
#include <string>
namespace nfd {
namespace face {
class TokenBucket {
public:
explicit TokenBucket(uint64_t rateLimit, uint64_t bucketSize, const std::string& name);
void start();
/**
* 周期性增加token
**/
void generateToken();
/**
* 尝试消费n个token
* @param tokenNum
* @return
*/
bool use(uint64_t tokenNum);
public:
double eachStep; // 每个周期增加的令牌
double currentTokenSize; // 当前令牌数
uint64_t rateLimit; // 令牌生成速率
uint64_t bucketSize; // 令牌桶容量
std::string name;
bool isStart; // 标识是否启动
};
}
}
#endif //NS_TOKENBUCKET_H
+63 -5
View File
@@ -24,7 +24,7 @@
*/
#include "generic-link-service.hpp"
#include <iostream>
#include <ndn-cxx/lp/fields.hpp>
#include <ndn-cxx/lp/pit-token.hpp>
#include <ndn-cxx/lp/tags.hpp>
@@ -47,10 +47,41 @@ GenericLinkService::GenericLinkService(const GenericLinkService::Options& option
, m_lastSeqNo(-2)
, m_nextMarkTime(time::steady_clock::time_point::max())
, m_nMarkedSinceInMarkingState(0)
, globalSp(options.C * 0.99, options.vqSize)
{
m_reassembler.beforeTimeout.connect([this] (auto&&...) { ++nReassemblyTimeouts; });
m_reliability.onDroppedInterest.connect([this] (const auto& i) { notifyDroppedInterest(i); });
nReassembling.observe(&m_reassembler);
this->sendPacketCallback = [this](std::shared_ptr<VirtualQueueItem> taskPtr) {
if (taskPtr != nullptr) {
const Block &block = taskPtr->item->wireEncode();
// std::cout << "interest wireEncode size: " << block.size() << std::endl;
lp::Packet lpPacket(block);
// std::cout << "Interest lppacket size before encode lp fields: " << lpPacket.wireEncode().size() << std::endl;
encodeLpFields(*(taskPtr->item), lpPacket);
// std::cout << "Interest lppacket size after encode lp fields: " << lpPacket.wireEncode().size() << std::endl;
this->sendNetPacket(std::move(lpPacket), true);
// lp::Packet lpPacket(taskPtr->item->wireEncode());
// encodeLpFields(*(taskPtr->item), lpPacket);
// this->sendNetPacket(std::move(lpPacket), taskPtr->endpointId, true);
}
// 得到下一个需要调度的包
auto newTaskPtr = this->globalSp.scheduleNext();
if (newTaskPtr == nullptr) {
//NFD_LOG_ERROR("FUCK");
// 如果没有可调度的包,则 1 ms 之后再调度
this->canScheduler = true;
// getScheduler().schedule(time::milliseconds(1), std::bind(this->sendPacketCallback, nullptr));
} else {
// 等待足够的 token 再发送
this->canScheduler = false;
// NFD_LOG_DEBUG("Route waitTime: " << newTaskPtr->waitNsTime);
getScheduler().schedule(time::nanoseconds((uint32_t)(newTaskPtr->waitNsTime)),
std::bind(this->sendPacketCallback, newTaskPtr));
}
};
// getScheduler().schedule(time::nanoseconds(0), std::bind(this->sendPacketCallback, nullptr));
}
void
@@ -115,19 +146,46 @@ GenericLinkService::sendLpPacket(lp::Packet&& pkt)
void
GenericLinkService::doSendInterest(const Interest& interest)
{
lp::Packet lpPacket(interest.wireEncode());
if (interest.getName().getPrefix(1).toUri() == "/localhost") {
const Block &block = interest.wireEncode();
// // std::cout << "interest wireEncode size: " << block.size() << std::endl;
lp::Packet lpPacket(block);
// // std::cout << "lppacket size before encode lp fields: " << lpPacket.wireEncode().size() << std::endl;
encodeLpFields(interest, lpPacket);
// // std::cout << "lppacket size after encode lp fields: " << lpPacket.wireEncode().size() << std::endl;
this->sendNetPacket(std::move(lpPacket), true);
} else {
this->globalSp.appendInterest(interest);
if (this->canScheduler) {
this->sendPacketCallback(nullptr);
}
//if (!success) {
// // send Nack
// lp::Nack nack(interest);
// nack.setReason(lp::NackReason::CONGESTION);
// if (this->nwfq.canMark(nack)) {
// // 拥塞标记
// nack.getInterest().setTag(make_shared<lp::CongestionMarkTag>(1));
// }
// this->receiveNack(nack, endpointId);
//}
}
// lp::Packet lpPacket(interest.wireEncode());
encodeLpFields(interest, lpPacket);
// encodeLpFields(interest, lpPacket);
this->sendNetPacket(std::move(lpPacket), true);
// this->sendNetPacket(std::move(lpPacket), true);
}
void
GenericLinkService::doSendData(const Data& data)
{
// const Block& block = data.wireEncode();
// std::cout << "Data wireEncode size: " << block.size() << std::endl;
lp::Packet lpPacket(data.wireEncode());
// std::cout << "Data before encode LpFields size: " << lpPacket.wireEncode().size() << std::endl;
encodeLpFields(data, lpPacket);
// std::cout << "Data After encode LpFields size: " << lpPacket.wireEncode().size() << std::endl;
this->sendNetPacket(std::move(lpPacket), false);
}
+8
View File
@@ -30,6 +30,7 @@
#include "lp-fragmenter.hpp"
#include "lp-reassembler.hpp"
#include "lp-reliability.hpp"
#include "queues.hpp"
namespace nfd::face {
@@ -164,6 +165,9 @@ public:
* being set with canOverrideMtuTo().
*/
ssize_t overrideMtu = std::numeric_limits<ssize_t>::max();
uint64_t C = 10000000;
size_t vqSize = 20;
};
/** \brief %Counters provided by GenericLinkService.
@@ -316,6 +320,10 @@ NFD_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
/// number of marked packets in the current incident of congestion
size_t m_nMarkedSinceInMarkingState;
GlobalSP globalSp;
bool canScheduler = true;
std::function<void(std::shared_ptr<VirtualQueueItem>)> sendPacketCallback;
friend LpReliability;
};
+505
View File
@@ -0,0 +1,505 @@
#include "queues.hpp"
#include "common/global.hpp"
#include <ndn-cxx/encoding/estimator.hpp>
namespace nfd
{
namespace face
{
NFD_LOG_INIT(QSCCPQueue);
bool VirtualQueue::push(int seq, const std::shared_ptr<const Interest> &interest,
uint64_t dataSize, bool needInsert)
{
this->recvBits += dataSize;
if (!needInsert)
{
return false;
}
if (this->currentSize >= this->size)
{
return false;
}
this->lastSeq = seq;
VirtualQueueItem vqi;
vqi.seq = seq;
vqi.item = interest;
vqi.queue = shared_from_this();
vqi.dataSize = dataSize;
this->innerQueue.push_back(vqi);
this->currentSize++;
auto serviceClass = interest->getServiceClass();
if (serviceClass && *serviceClass > 7)
{
this->penaltyFactor = 0.5;
}
return true;
}
WDRRQueue::WDRRQueue(uint32_t rateLimit, uint32_t availableBandwidth, double weight, const std::string &name,
uint32_t tos, size_t vqSize) : availableBandwidth(availableBandwidth),
weight(weight), name(name), tos(tos), vqSize(vqSize)
{
if (rateLimit > 0)
{
this->needRateLimit = true;
this->tokenBucket = std::make_shared<TokenBucket>(rateLimit, 20 * 1000 * 8, name);
this->tokenBucket->start();
}
else
{
this->needRateLimit = false;
}
}
void WDRRQueue::updateAvailableBandwidth(uint32_t C)
{
this->availableBandwidth = C;
}
bool WDRRQueue::appendInterest(const Interest &interest)
{
std::string key = interest.getName().getPrefix(1).toUri();
if (this->virtualQueues.count(key) == 0)
{
this->virtualQueues[key] = std::make_shared<VirtualQueue>(1, this->vqSize);
this->virtualQueues[key]->tos = this->tos;
this->flows.push_back(key);
this->totalWeight += 1;
for (auto &item : this->virtualQueues)
{
item.second->objectRate = this->availableBandwidth * item.second->weight / this->totalWeight;
item.second->objectRate = item.second->objectRate * item.second->penaltyFactor;
}
}
bool needInsert = true;
if (this->cdt > 0 && this->currentPacketNum >= this->cdt)
{
needInsert = false;
}
uint64_t dataSize = 9000;
auto dsz = interest.getDsz();
if (dsz)
{
dataSize = *dsz;
}
this->recvBits += dataSize;
auto success = this->virtualQueues[key]->push(0, interest.shared_from_this(), dataSize, needInsert);
if (success)
{
this->currentPacketNum++;
}
return success;
}
std::shared_ptr<VirtualQueueItem> WDRRQueue::scheduleNext()
{
if (this->nextScheduleQueue >= this->flows.size())
{
for (auto &key : this->flows)
{
if (this->virtualQueues.count(key) == 0)
{
continue;
}
auto &item = this->virtualQueues[key];
item->balance += item->weight * item->penaltyFactor * this->MTU_QUANTA;
}
this->nextScheduleQueue = 0;
}
std::shared_ptr<VirtualQueue> selectedVQ = nullptr;
for (; this->nextScheduleQueue < this->flows.size(); this->nextScheduleQueue++)
{
auto &item = this->virtualQueues[this->flows[this->nextScheduleQueue]];
if (item->empty())
{
item->balance = 0;
continue;
}
if (item->front().dataSize < item->balance)
{
selectedVQ = item;
break;
}
if (item->len() > 10)
{
}
}
if (selectedVQ == nullptr && this->currentPacketNum > 0)
{
return scheduleNext();
}
if (selectedVQ != nullptr)
{
if (needRateLimit && !this->tokenBucket->use(selectedVQ->front().dataSize))
{
return nullptr;
}
auto item = std::make_shared<VirtualQueueItem>(selectedVQ->front());
selectedVQ->balance -= item->dataSize;
selectedVQ->pop();
if (selectedVQ->penaltyFactor < 0.9)
{
item->item->setServiceClass(selectedVQ->tos + 8);
}
this->sendBits += item->dataSize;
this->currentPacketNum--;
return item;
}
return nullptr;
}
uint32_t WDRRQueue::calObjectRate(bool getCurrentRateOnly)
{
uint32_t currentRate = this->recvBits * 1000 / this->calObjectRateInterval;
uint32_t currentSendRate = this->sendBits * 1000 / this->calObjectRateInterval;
if (getCurrentRateOnly)
{
return currentRate;
}
bool hasCleanFlow = false;
for (auto it = this->virtualQueues.begin(); it != this->virtualQueues.end();)
{
if (it->second->recvBits == 0 && it->second->empty())
{
this->totalWeight -= it->second->weight;
this->virtualQueues.erase(it++); // here is the key
hasCleanFlow = true;
}
else
{
it++;
}
}
if (hasCleanFlow)
{
this->flows.resize(this->virtualQueues.size());
int idx = 0;
for (auto it = this->virtualQueues.begin(); it != this->virtualQueues.end(); it++, idx++)
{
this->flows[idx] = it->first;
}
}
double totalHightSpeedFlowWeight = 0;
int hightSppedFlowNum = 0;
uint32_t consumeRate = 0;
double totalWeightThisTerm = 0;
for (auto &item : this->virtualQueues)
{
totalWeightThisTerm += (item.second->weight * item.second->penaltyFactor);
}
for (auto &item : this->virtualQueues)
{
item.second->objectRate = this->availableBandwidth * item.second->weight * item.second->penaltyFactor / totalWeightThisTerm;
item.second->currentFlowRate = item.second->recvBits * 1000 / this->calObjectRateInterval;
item.second->currentRatio = item.second->currentFlowRate * 1.0 / item.second->objectRate;
auto targetHighFlowRate = (item.second->objectRate - 500000) * 1.0 / item.second->objectRate;
if (item.second->currentRatio > targetHighFlowRate)
{
if (item.second->penaltyFactor > 0.99)
{
hightSppedFlowNum++;
totalHightSpeedFlowWeight += item.second->weight;
}
consumeRate += item.second->objectRate;
}
else
{
consumeRate += item.second->currentFlowRate;
}
NFD_LOG_DEBUG(this->name << " => currentRate: " << item.second->currentFlowRate << ", consumerRate: " << consumeRate << ", objectRate: " << item.second->objectRate << ", availableBandwidth" << this->availableBandwidth << ", weight: " << item.second->weight << ", totalWeight: " << this->totalWeight << ", currentSendRate: " << currentSendRate);
item.second->recvBits = 0;
}
if (consumeRate < this->availableBandwidth && hightSppedFlowNum > 0)
{
auto remainRate = this->availableBandwidth - consumeRate;
for (auto &item : this->virtualQueues)
{
if (item.second->penaltyFactor > 0.99 && item.second->currentRatio > 0.8)
{
item.second->objectRate += (remainRate * item.second->weight / totalHightSpeedFlowWeight);
}
NFD_LOG_DEBUG(this->name << " => objectRate: " << item.second->objectRate << ", remainRate: " << remainRate << ", item.weight: " << item.second->weight << ", totalWeight: " << totalHightSpeedFlowWeight);
}
}
for (auto &item : this->virtualQueues)
{
if (item.second->currentFlowRate * 1.0 / item.second->objectRate > item.second->gredyRatio)
{
if (item.second->gredyTimes < item.second->gredyThreshold)
{
item.second->gredyTimes++;
}
if (item.second->gredyTimes == item.second->gredyThreshold)
{
item.second->penaltyFactor = 0.5;
item.second->size = 20;
}
}
else
{
if (item.second->gredyTimes > 0)
{
item.second->gredyTimes--;
}
}
}
for (auto &item : this->virtualQueues)
{
if (item.second->penaltyFactor > 0.99)
{
auto rate = this->calDiscountFactor(item.second->currentSize * 1.0 / item.second->size);
item.second->objectRate *= rate;
}
}
this->sendBits = 0;
this->recvBits = 0;
return currentRate;
}
uint32_t WDRRQueue::scheduleOnePacketForWFQ(uint32_t totalLastSeq)
{
if (this->cachePacketForWFQ != nullptr)
{
return this->cachePacketForWFQ->seq;
}
if (this->currentPacketNum == 0)
{
return UINT32_MAX;
}
this->cachePacketForWFQ = this->scheduleNext();
if (this->cachePacketForWFQ == nullptr)
{
return UINT32_MAX;
}
if (this->lastSeq == -1)
{
this->lastSeq = totalLastSeq;
}
this->cachePacketForWFQ->seq = this->lastSeq + 1.0 * this->cachePacketForWFQ->dataSize / this->weight;
if (this->currentPacketNum == 0)
{
this->lastSeq = -1;
}
else
{
this->lastSeq = this->cachePacketForWFQ->seq;
}
return this->cachePacketForWFQ->seq;
}
std::shared_ptr<VirtualQueueItem> DelaySensitiveServiceSP::scheduleNext()
{
auto res = this->CS7Queue->scheduleNext();
if (res == nullptr)
{
res = this->CS6Queue->scheduleNext();
}
if (res == nullptr)
{
res = this->EFQueue->scheduleNext();
}
return res;
}
uint32_t DelaySensitiveServiceSP::calObjectRate(bool getCurrentRateOnly)
{
uint32_t res = this->CS7Queue->calObjectRate(getCurrentRateOnly);
res += this->CS6Queue->calObjectRate(getCurrentRateOnly);
res += this->EFQueue->calObjectRate(getCurrentRateOnly);
return res;
}
std::shared_ptr<VirtualQueueItem> WFQQueue::scheduleNext()
{
std::shared_ptr<WDRRQueue> selectedQueue = nullptr;
uint32_t currentMinSeq = UINT32_MAX;
for (auto &queue : this->queues)
{
uint32_t seq = queue->scheduleOnePacketForWFQ(this->lastSeq);
if (seq < currentMinSeq)
{
currentMinSeq = seq;
selectedQueue = queue;
}
}
if (selectedQueue != nullptr)
{
this->lastSeq = currentMinSeq;
auto selectPacket = selectedQueue->frontForWFQ(this->lastSeq);
selectedQueue->popForWFQ();
return selectPacket;
}
return nullptr;
}
uint32_t WFQQueue::calObjectRate(uint32_t availableBandwidth)
{
int totalRate = 0;
double totalHightSpeedFlowWeight = 0;
int hightSppedFlowNum = 0;
double totalHasFlowQueueWeight = 0;
int hasFlowQueueNum = 0;
for (auto &queue : this->queues)
{
auto recvRate = queue->calObjectRate(true);
queue->originAvaliableBandwidth = (queue->getWeight() / totalWeight) * availableBandwidth;
auto remainRate = queue->getRemainBandwidthRate(queue->originAvaliableBandwidth);
if (remainRate < 0.2)
{
hightSppedFlowNum++;
totalHightSpeedFlowWeight += queue->getWeight();
totalRate += queue->originAvaliableBandwidth;
}
else if (remainRate < 0.95)
{
hasFlowQueueNum++;
totalHasFlowQueueWeight += queue->getWeight();
totalRate += recvRate;
}
else
{
totalRate += recvRate;
}
}
this->availableBandwidth = availableBandwidth;
int remain = this->availableBandwidth - totalRate;
if (hightSppedFlowNum > 0)
{
for (auto &queue : this->queues)
{
if (queue->getRemainBandwidthRate(queue->originAvaliableBandwidth) < 0.2)
{
queue->updateAvailableBandwidth(
queue->getWeight() * remain * 1.0 / totalHightSpeedFlowWeight +
queue->originAvaliableBandwidth);
}
else
{
queue->updateAvailableBandwidth(queue->originAvaliableBandwidth);
}
}
}
else
{
for (auto &queue : this->queues)
{
if (queue->getRemainBandwidthRate(queue->originAvaliableBandwidth) < 0.95)
{
queue->updateAvailableBandwidth(
queue->getWeight() * remain * 1.0 / totalHasFlowQueueWeight +
queue->originAvaliableBandwidth);
}
else
{
queue->updateAvailableBandwidth(queue->originAvaliableBandwidth);
}
}
}
uint32_t currentTotalRate = 0;
for (auto &queue : this->queues)
{
currentTotalRate += queue->calObjectRate(false);
}
return currentTotalRate;
}
std::shared_ptr<VirtualQueueItem> GlobalSP::scheduleNext()
{
std::shared_ptr<VirtualQueueItem> selectedPacket = this->delaySensitiveServiceSp.scheduleNext();
if (selectedPacket == nullptr)
{
selectedPacket = wfqQueue.scheduleNext();
}
if (selectedPacket != nullptr)
{
selectedPacket->waitNsTime = selectedPacket->dataSize * 1000000000.0 / this->availableBandwidth + 1;
// 计算 DR => DR'(/p, j) = min(OR(/p, j), RR(/p, j) / sum(RR(/p)) * sum(AR(/p)))
auto currentObjectRate = selectedPacket->queue->objectRate;
auto downstreamRate = selectedPacket->item->getDownstreamRate();
if (downstreamRate)
{
if (*downstreamRate > currentObjectRate)
{
selectedPacket->item->setDownstreamRate(currentObjectRate);
}
}
else
{
selectedPacket->item->setDownstreamRate(currentObjectRate);
}
}
return selectedPacket;
}
std::shared_ptr<WDRRQueue> GlobalSP::getQueueByTos(int tosValue)
{
switch (tosValue)
{
case 0:
return this->delaySensitiveServiceSp.getCS7Queue();
case 1:
return this->delaySensitiveServiceSp.getCS6Queue();
case 2:
return this->delaySensitiveServiceSp.getEFQueue();
case 3:
return this->wfqQueue.getAF4Queue();
case 4:
return this->wfqQueue.getAF3Queue();
case 5:
return this->wfqQueue.getAF2Queue();
case 6:
return this->wfqQueue.getAF1Queue();
case 7:
default:
return this->wfqQueue.getBEQueue();
}
}
bool GlobalSP::appendInterest(const Interest &interest)
{
int serviceClass = 7;
if (interest.getServiceClass())
{
serviceClass = (*interest.getServiceClass()) % 8;
}
return this->getQueueByTos(serviceClass)->appendInterest(interest);
}
void GlobalSP::receiveInterest(const Interest &interest)
{
ndn::encoding::EncodingEstimator estimator;
auto size = interest.wireEncode(estimator) + 9;
this->interestBytes += size;
}
void GlobalSP::calObjectRate(bool getCurrentRateOnly)
{
getScheduler().schedule(time::nanoseconds(1000000 * this->calObjectRateInterval),
[this]
{ this->calObjectRate(false); });
uint32_t remain = this->availableBandwidth;
remain -= this->interestBytes * 8000 / this->calObjectRateInterval;
this->interestBytes = 0;
remain -= this->delaySensitiveServiceSp.calObjectRate(false);
this->wfqQueue.calObjectRate(remain);
}
void GlobalSP::assignObjectRate(const Data &data)
{
std::string key = data.getName().getPrefix(1).toUri();
int serviceClass = 7;
if (data.getServiceClass())
{
serviceClass = (*data.getServiceClass()) % 8;
}
auto objectRate = this->getQueueByTos(serviceClass)->getObjectRateByFlowKey(key);
auto targetRate = data.getTargetRate();
if (!targetRate || *targetRate > objectRate)
{
data.setTargetRate(objectRate);
}
}
}
}
+338
View File
@@ -0,0 +1,338 @@
#ifndef NS_QUEUES_HPP
#define NS_QUEUES_HPP
#include <deque>
#include <unordered_map>
#include <memory>
#include "common/global.hpp"
#include "link-service.hpp"
#include <ndn-cxx/lp/tags.hpp>
#include "TokenBucket.hpp"
#include <iostream>
namespace nfd
{
namespace face
{
class VirtualQueue;
struct VirtualQueueItem
{
int seq;
std::shared_ptr<const Interest> item;
EndpointId endpointId;
uint32_t dataSize;
uint32_t waitNsTime;
std::shared_ptr<VirtualQueue> queue;
};
class VirtualQueue : public std::enable_shared_from_this<VirtualQueue>
{
public:
explicit VirtualQueue(double weight, int size) : std::enable_shared_from_this<VirtualQueue>(),
penaltyFactor(1.0),
weight(weight), size(size), currentSize(0), lastSeq(0),
C(0)
{
}
void enableTokenBucket(uint32_t limitRate)
{
this->C = limitRate;
}
bool push(int seq, const std::shared_ptr<const Interest> &interest, const EndpointId &endpointId,
uint64_t dataSize, bool needInsert);
bool empty()
{
return this->innerQueue.empty();
}
uint32_t frontSeq()
{
if (this->innerQueue.empty())
{
return this->lastSeq;
}
return this->innerQueue.front().seq;
}
VirtualQueueItem front()
{
return this->innerQueue.front();
}
uint32_t backSeq()
{
return this->lastSeq;
}
VirtualQueueItem back()
{
return this->innerQueue.back();
}
void pop()
{
if (this->currentSize <= 0)
{
return;
}
this->currentSize--;
this->innerQueue.pop_front();
}
void popBack()
{
if (this->currentSize <= 0)
{
return;
}
this->currentSize--;
this->innerQueue.pop_back();
}
int len()
{
return this->innerQueue.size();
}
void setLastSeq(uint32_t seq)
{
this->lastSeq = seq;
}
uint32_t getLastSeq()
{
return lastSeq;
}
void setWeight(double weight)
{
this->weight = weight;
}
double getWeight()
{
return this->weight;
}
public:
uint32_t objectRate;
double penaltyFactor = 1.0;
uint32_t gredyTimes = 0;
double gredyRatio = 1.2;
double gredyThreshold = 5;
double weight = 1.0;
uint32_t tos = 5;
double balance = 0;
int size = 0;
int currentSize = 0;
uint32_t recvBits = 0;
uint32_t currentFlowRate = 0;
double currentRatio = 0.0;
private:
uint32_t lastSeq;
int C;
std::deque<VirtualQueueItem> innerQueue;
};
class WDRRQueue
{
public:
explicit WDRRQueue(uint32_t rateLimit, uint32_t availableBandwidth, double weight, const std::string &name,
uint32_t tos = 5, size_t vqSize = 20);
void updateAvailableBandwidth(uint32_t C);
bool appendInterest(const Interest &interest, const EndpointId &endpointId);
std::shared_ptr<VirtualQueueItem> scheduleNext();
uint32_t calObjectRate(bool getCurrentRateOnly);
void setCDTLimit(int cdt)
{
this->cdt = cdt;
}
uint32_t scheduleOnePacketForWFQ(uint32_t totalLastSeq);
std::shared_ptr<VirtualQueueItem> frontForWFQ(uint32_t totalLastSeq)
{
if (this->cachePacketForWFQ == nullptr)
{
this->scheduleOnePacketForWFQ(totalLastSeq);
}
return this->cachePacketForWFQ;
}
void popForWFQ()
{
this->cachePacketForWFQ = nullptr;
}
double getRemainBandwidthRate(double curAvailableBandWidth)
{
auto currentRate = this->calObjectRate(true);
if (currentRate > curAvailableBandWidth)
{
return 0;
}
return (curAvailableBandWidth - currentRate) * 1.0 / curAvailableBandWidth;
}
uint32_t getRemainBandwidth()
{
return this->availableBandwidth - this->calObjectRate(true);
}
double getWeight()
{
return this->weight;
}
uint32_t getAvailableBandwidth()
{
return this->availableBandwidth;
}
/**
* @brief
*
* \sigma=\frac{1}{3}\arccot\funcapply(8\delta-4)
* @param x
* @return double
*/
double calDiscountFactor(double x)
{
return 1.0 / 3 * (M_PI / 2 - atan(8 * x - 4)) + 0.034;
}
/**
* @brief Get the Object Rate By Flow Key object
*
* @param key
* @return uint32_t
*/
uint32_t getObjectRateByFlowKey(const std::string &key)
{
if (this->virtualQueues.count(key) == 0)
{
std::cout << "FUCK: " << key << ", count: " << this->virtualQueues.size() << ", flow: " << this->flows.size() << "availbleRate: " << this->availableBandwidth << std::endl;
return 0;
}
return this->virtualQueues[key]->objectRate;
}
public:
double originAvaliableBandwidth = 0;
private:
std::unordered_map<std::string, std::shared_ptr<VirtualQueue>> virtualQueues;
std::vector<std::string> flows;
size_t nextScheduleQueue = 0;
std::shared_ptr<TokenBucket> tokenBucket;
uint32_t availableBandwidth;
// uint32_t extraBandwidth;
double weight;
std::string name;
uint32_t tos = 5;
bool needRateLimit;
double totalWeight;
uint32_t sendBits = 0;
uint32_t recvBits = 0;
int vqSize = 20;
int cdt = -1;
int currentPacketNum = 0;
int MTU_QUANTA = 2 * 1000 * 8;
int calObjectRateInterval = 100;
int64_t lastSeq = -1;
std::shared_ptr<VirtualQueueItem> cachePacketForWFQ = nullptr;
};
class DelaySensitiveServiceSP
{
public:
explicit DelaySensitiveServiceSP(uint32_t availableBandwidth, size_t vqSize = 20)
{
this->CS7Queue = std::make_shared<WDRRQueue>((uint32_t)(0.2 * availableBandwidth), (uint32_t)(0.2 * availableBandwidth), 2, "CS7", 2, vqSize);
this->CS6Queue = std::make_shared<WDRRQueue>((uint32_t)(0.2 * availableBandwidth), (uint32_t)(0.2 * availableBandwidth), 2, "CS6", 1, vqSize);
this->EFQueue = std::make_shared<WDRRQueue>((uint32_t)(0.3 * availableBandwidth), (uint32_t)(0.3 * availableBandwidth), 3, "EF", 0, vqSize);
}
std::shared_ptr<VirtualQueueItem> scheduleNext();
uint32_t calObjectRate(bool getCurrentRateOnly);
std::shared_ptr<WDRRQueue> getCS7Queue()
{
return this->CS7Queue;
}
std::shared_ptr<WDRRQueue> getCS6Queue()
{
return this->CS6Queue;
}
std::shared_ptr<WDRRQueue> getEFQueue()
{
return this->EFQueue;
}
private:
std::shared_ptr<WDRRQueue> CS7Queue;
std::shared_ptr<WDRRQueue> CS6Queue;
std::shared_ptr<WDRRQueue> EFQueue;
};
class WFQQueue
{
public:
explicit WFQQueue(uint32_t availableBandwidth, size_t vqSize) : AF4Weight(5),
AF3Weight(4),
AF2Weight(3),
AF1Weight(2),
BEWeight(1),
totalWeight(AF4Weight + AF3Weight + AF2Weight + AF1Weight + BEWeight),
queues(5),
lastSeq(0),
availableBandwidth(availableBandwidth)
{
this->queues[0] = std::make_shared<WDRRQueue>(0, availableBandwidth * AF4Weight / totalWeight, 5, "AF4", 3, vqSize);
this->queues[1] = std::make_shared<WDRRQueue>(0, availableBandwidth * AF3Weight / totalWeight, 4, "AF3", 4, vqSize);
this->queues[2] = std::make_shared<WDRRQueue>(0, availableBandwidth * AF2Weight / totalWeight, 3, "AF2", 5, vqSize);
this->queues[3] = std::make_shared<WDRRQueue>(0, availableBandwidth * AF1Weight / totalWeight, 2, "AF1", 6, vqSize);
this->queues[4] = std::make_shared<WDRRQueue>(0, availableBandwidth * BEWeight / totalWeight, 1, "BE", 7, vqSize);
for (auto &queue : this->queues)
{
queue->setCDTLimit(this->cdt);
}
}
std::shared_ptr<VirtualQueueItem> scheduleNext();
uint32_t calObjectRate(uint32_t availableBandwidth);
std::shared_ptr<WDRRQueue> getAF4Queue()
{
return this->queues[0];
}
std::shared_ptr<WDRRQueue> getAF3Queue()
{
return this->queues[1];
}
std::shared_ptr<WDRRQueue> getAF2Queue()
{
return this->queues[2];
}
std::shared_ptr<WDRRQueue> getAF1Queue()
{
return this->queues[3];
}
std::shared_ptr<WDRRQueue> getBEQueue()
{
return this->queues[4];
}
private:
double AF4Weight = 5;
double AF3Weight = 4;
double AF2Weight = 3;
double AF1Weight = 2;
double BEWeight = 1;
double totalWeight = 15;
std::vector<std::shared_ptr<WDRRQueue>> queues;
uint32_t lastSeq;
u_int32_t availableBandwidth;
int cdt = 200;
int hql;
};
class GlobalSP
{
public:
explicit GlobalSP(uint32_t availableBandwidth, size_t vqSize = 20) : delaySensitiveServiceSp(availableBandwidth, vqSize),
wfqQueue(availableBandwidth, vqSize),
availableBandwidth(availableBandwidth)
{
getScheduler().schedule(time::nanoseconds(1000000 * this->calObjectRateInterval),
[this]
{ this->calObjectRate(false); });
}
std::shared_ptr<VirtualQueueItem> scheduleNext();
bool appendInterest(const Interest &interest, const EndpointId &endpointId);
void receiveInterest(const Interest &interest);
void calObjectRate(bool getCurrentRateOnly);
std::shared_ptr<WDRRQueue> getQueueByTos(int tosValue);
void assignObjectRate(const Data &data);
private:
DelaySensitiveServiceSP delaySensitiveServiceSp;
WFQQueue wfqQueue;
int calObjectRateInterval = 100;
uint32_t availableBandwidth;
size_t interestBytes = 0;
};
}
}
#endif // NS_QUEUES_HPP