Add preview of MIRCC multi-stream version

This commit is contained in:
Lv
2022-02-18 08:40:14 -08:00
parent ac935a8fc8
commit cb53fb551d
2 changed files with 500 additions and 0 deletions
+376
View File
@@ -0,0 +1,376 @@
#include "mircc-strategy.hpp"
namespace nfd {
namespace fw {
MIRCCStrategy::MIRCCStrategy(Forwarder &forwarder, const Name &name = getStrategyName())
: Strategy(forwarder), ProcessNackTraits<DSCCPStrategy>(this)
{
this->setInstanceName(makeInstanceName(name, getStrategyName()));
FaceTable& faceTable = m_forwarder.m_faceTable;
for (auto i = faceTable.begin();i != faceTable.end();i++)
{
m_Rp_base[i] = getMaxR();
m_Rp_ex[i] = 0;
m_Rs_base[i] = 0;
m_Rs_ex[i] = 0;
m_Yp[i] = 0;
m_Yp_old[i] = 0;
m_Ys[i] = 0;
m_Ys_old[i] = 0;
m_d[i] = 500;
m_ratio[i] = 3000/INTEREST_SIZE;
m_linkC[i] = 10 * 1000 * 1000 * 8;
sendInterest(i);
calcRt(i);
}
//初始化hash
hashUnit[0] = 1;
for (int i = 1;i < 100;i++)
{
hashUnit[i] = hashUnit[i - 1] * hashNum % hashMo;
}
//初始化逆元
invMod = fpow(hashNum,hashMo - 2,hashMo);
}
void
MIRCCStrategy::sendQueueInterest(FaceEndpoint &ingress)
{
//设置下一次发送,根据兴趣包与数据包的大小之比,计算出兴趣包可用带宽,进而计算出每发送一个兴趣包需要的时间,最小为1ms
getScheduler().schedule(time::milliseconds(max(1,INTEREST_SIZE / (m_linkC[ingress] * (1.0 / (m_ratio[ingress] + 1))))), std::bind(&MIRCCStrategy::sendQueueInterest, this, ingress));
//清空队列中过期的兴趣包
refreshQueueInterest(ingress);
//分三种情况讨论,只有第一类兴趣包可以发,只有第二类兴趣包可以发,两类可以发(选择最早到达的)
if (m_Queue[ingress].qp.size() > 0 and m_Queue[ingress].qs.size() == 0)
{
//发送兴趣包
sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qp.front());
//这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能
m_Queue[ingress].qp.pop_front();
m_Queue[ingress].tp.pop_front();
}else if (m_Queue[ingress].qs.size() > 0 and m_Queue[ingress].qp.size() == 0)
{
//发送兴趣包
sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qs.front());
//这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能
m_Queue[ingress].qs.pop_front();
m_Queue[ingress].ts.pop_front();
}else
{
//如果第一类兴趣包更早到达
if (m_Queue[ingress].tp.front() < m_Queue[ingress].ts.front())
{
//发送兴趣包
sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qp.front());
//这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能
m_Queue[ingress].qp.pop_front();
m_Queue[ingress].tp.pop_front();
}else
{
//发送兴趣包
sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qs.front());
//这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能
m_Queue[ingress].qs.pop_front();
m_Queue[ingress].ts.pop_front();
}
}
}
void
MIRCCStrategy::calcRt(FaceEndpoint &ingress)
{
//每100ms自动重新计算Rt
getScheduler().schedule(time::milliseconds(100), std::bind(&MIRCCStrategy::calcRt, this, ingress));
uint64_t ia,ib,Rp,Rs;
Rp = m_Rp_base[ingress] + m_Rp_ex[ingress];
Rs = m_Rs_base[ingress] + m_Rs_ex[ingress];
//Rp计算过程
ia = std::max(m_linkC[ingress],m_Yp[ingress]);
ib = std::max(Rp,1);
uint64_t Np = ia / ib;
ia = m_Yp[ingress] - m_Yp_old[ingress];
ib = std::max(m_Yp[ingress],1);
double Bp = std::max(m_B,(double)ia / ib);
ia = m_miu * m_linkC[ingress] - Bp * getQt() / std::max(m_d[ingress],0.001);
ib = std::max(Np,1);
m_Rp_base[ingress] = m_a * m_Rp_base[ingress] + (1 - m_a) * ia / ib;
ia = m_Yp[ingress];
ib = std::max(Np,1);
m_Rp_ex[ingress] = m_a * m_Rp_ex[ingress] + (1 - m_a) * (Rp - ia / ib);
m_Yp_old[ingress] = m_Yp[ingress];
//Rs计算过程
uint64_t Cs = std::max(m_linkC[ingress] - m_Yp[ingress],0);
uint64_t Ns = std::max(max(Cs,m_Ys[ingress]) / Rs,1);
ia = m_Ys[ingress] - m_Ys_old[ingress];
ib = std::max(m_Ys[ingress],1);
double Bs = std::max(m_B,(double)ia / ib);
ia = m_miu * Cs - Bs * getQt() / std::max(m_d[ingress],0.001);
ib = std::max(Ns,1);
m_Rs_base[ingress] = m_a * m_Rs_base[ingress] + (1 - m_a) * ia / i b;
ia = m_Ys[ingress];
ib = std;:max(Ns,1);
m_Rs_ex[ingress] = m_a * m_Rs_ex[ingress] + (1 - m_a) * (Rs - ia / ib);
m_Ys_old[ingress] = m_Ys[ingress];
m_Yp[ingress] = 0;
m_Ys[ingress] = 0;
}
//在队列大小限制范围内的兴趣包,转发过程不会超时,这个功能暂时没用。
/*
void
MIRCCStrategy::refreshQueueInterest(FaceEndpoint &ingress)
{
//清空face队列中过期的兴趣包
auto now = time::steady_clock::now();
while (m_Queue[ingress].qp.size() > 0 and m_Queue[ingress].tp.front() < now)
{
m_Queue[ingress].qp.pop_front();
m_Queue[ingress].tp.pop_front();
}
while (m_Queue[ingress].qs.size() > 0 and m_Queue[ingress].ts.front() < now)
{
m_Queue[ingress].qs.pop_front();
m_Queue[ingress].ts.pop_front();
}
}
*/
uint64_t
MIRCCStrategy::getQt(FaceEndpoint &ingress)
{
//清除队列中过期兴趣包,避免造成干扰
//refreshQueueInterest(ingress);
//返回兴趣包个数*兴趣包大小*数据包与兴趣包大小之比
return (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size()) * INTEREST_SIZE * m_ratio[ingress];
}
void
MIRCCStrategy::pushQueueInterest(FaceEndpoint &ingress, const Interest &interest)
{
//更新Yt
updateYt(ingress,interest);
uint64_t p = interest.getTag<lp::PTag>();
//如果是第一类兴趣包
if (p == 1)
{
//如果队列已经满了
if (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size() >= QUEUE_MAX)
{
//如果存在第二类兴趣包可以删除
if (m_Queue[ingress].qs.size() > 0)
{
//删掉第二类兴趣包最后一个,把当前第一类兴趣包加入
m_Queue[ingress].qs.pop_back();
m_Queue[ingress].ts.pop_back();
m_Queue[ingress].qp.push_back(interest);
m_Queue[ingress].tp.push_back(time::steady_clock::now());
}else
{
//否则当前兴趣包丢失
//这里应该回传Nack,还没有写完
}
}else
{
//直接将当前兴趣包加入第一类兴趣包队列末尾
m_Queue[ingress].qp.push_back(interest);
m_Queue[ingress].tp.push_back(time::steady_clock::now());
}
}else
{
//如果队列已经满了
if (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size() >= QUEUE_MAX)
{
//当前兴趣包丢失
//这里是否应该回传Nack不确定,需要回去看论文,第二类兴趣包处理方式似乎与第一类不同,还没有写完
}else
{
//将但钱兴趣包加入第二类末尾
m_Queue[ingress].qs.push_back(interest);
m_Queue[ingress].ts.push_back(time::steady_clock::now());
}
}
}
void
MIRCCStrategy::updateYt(FaceEndpoint &ingress, const Interest &interest)
{
uint64_t p = interest.getTag<lp::PTag>();
if (p == 1)
{
m_Yp[ingress] += INTEREST_SIZE * m_ratio[ingress];
}else
{
m_Ys[ingress] += INTEREST_SIZE * m_ratio[ingress];
}
}
void
MIRCCStrategy::afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest, const shared_ptr<pit::Entry> &pitEntry)
{//目前在按照标识路径转发,采用MIRCC论文中的方法2
//MIRCC似乎不需要判断isNextHopEligible???
//稍微有点不理解Out-record在干啥
if (hasPendingOutRecords(*pitEntry))
{
//已经有记录,等待即可。
//这个位置要不要在其他路径继续转发(比如pathid不相同的情况),不太确定
//我记得原文是说不考虑任何的中间的cs命中之类的情况
return;
}
//根据hash值获取一个参考端口号,还需要根据可用端口数量进一步取模来最终选取转发端口。
uint64_t tmpFace = chooseFaceForInterestByHash(interest);
const fib::Entry& fibEntry = this->lookupFib(*pitEntry);
NextHopList outFaces = fibEntry.getNextHops();
if (outFace.size() != 0)
{//如果存在端口可以转发
uint64_t choosedFace = tmpFace % outFace.size();
Face& outFace = choosedFace[choosedFace].getFace();
//放入对应端口队列进行排队
this->pushQueueInterest(pitEntry, FaceEndpoint(outFace, 0), interest);
return;
}
//如果发现没有端口可以转发,则设置立马删除该条pit
//后续Nack处理是否得当并不清楚,还需要研究
lp::NackHeader nackHeader;
nackHeader.setReason(lp::NackReason::NO_ROUTE);
this->sendNack(pitEntry, ingress, nackHeader);
this->rejectPendingInterest(pitEntry);
return;
}
void
MIRCCStrategy::afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack, const shared_ptr<pit::Entry> &pitEntry)
{
//nack具体怎么处理还需要研究
//nack应该利用预留给data的带宽,返回给consumer
//Forwarder::onIncomingNack似乎已经把Nack返回给consumer了
}
void
MIRCCStrategy::beforeSatisfyInterest(const shared_ptr<pit::Entry> &pitEntry, const FaceEndpoint &ingress, const Data &data)
{
auto now = time::steady_clock::now();
uint64_t rcd_cnt = 0;
//目前计算中,只考虑没有过期的并成功收到数据包的兴趣包对RTT和ratio的影响,超时的数据包是否对RTT有影响?。
for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
if (inRecord.getExpiry() > now)
{
if (inRecord.getFace().getId() == ingress.face.getId() && inRecord.getFace().getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC)
{
continue;
}
//计算收到数据包对数据包/兴趣包大小比率的影响
m_ratio[ingress] = m_ratio_a * m_ratio[ingress] + (1 - m_ratio_a) * (data.getContent().size() / INTEREST_SIZE);
//计算收到数据包对RTT的影响。
uint64_t tmpRTT = time::steady_clock::now() - inRecord.getLastRenewed();//不确定正确性,可能还要继续考虑
m_d[ingress] = m_rtt_a * m_d[ingress] + (1 - m_rtt_a) * tmpRTT;
}
}
}
void
MIRCCStrategy::fastpow(int64_t a,int64_t k,int64_t p)
{
int64_t = 1;
while (k != 0)
{
if (k % 2 != 0)
{
res = res * a % p;
}
k >>= 1;
a = a * a % p;
}
return res;
}
void
MIRCCStrategy::setHashForData(const FaceEndpoint &ingress, const Data &data)
{//注意!!HopCountTag的工作情况目前不太了解,似乎ndnsim这个Tag没做处理,后续需要考虑
//处理路径标识id
uint64_t hashOld = data.getTag<lp::PathidTag>();
//uint64_t hashNew = (hashOld + hashUnit[data.getTag<lp::HopCountTag>()] * ingress.getFace().getId() % hashMo) % hashMo;
//靠近producer的为hash值的高位
uint64_t hashNew = (hashOld * hashNum % hashMo + ingress.getFace().getId() % hashMo) % hashMo;
//给data设置新的hash值
data.setTag(make_shared<lp::HopCountTag>(hashNew));
}
uint64_t
MIRCCStrategy::chooseFaceForInterestByHash(const Interest &interest)
{
uint64_t hashGuide = data.getTag<lp::PathidTag>();
uint64_t Faceid = hashGuide % hashMo % hashNum;
return Faceid;
}
void
MIRCCStrategy::afterReceiveData(const shared_ptr<pit::Entry> &pitEntry, const FaceEndpoint &ingress, const Data &data)
{
setHashForData(ingress, data)
//修改Rt的值
uint64_t dataRp = data.getTag<lp::RpTag>();
uint64_t dataRs = data.getTag<lp::RsTag>();
if (dataRp > m_Rp_base[ingress] + m_Rp_ex[ingress])
{
data.setTag(make_shared<lp::RpTag>(m_Rp_base[ingress] + m_Rp_ex[ingress]));
}
if (dataRs > m_Rs_base[ingress] + m_Rs_ex[ingress])
{
data.setTag(make_shared<lp::RsTag>(m_Rs_base[ingress] + m_Rs_ex[ingress]));
}
beforeSatisfyInterest(pitEntry, ingress, data);
sendDataToAll(pitEntry, ingress, data);
}
}
}
+124
View File
@@ -0,0 +1,124 @@
#include "face/face.hpp"
#include "fw/strategy.hpp"
#include "fw/algorithm.hpp"
#include "fw/process-nack-traits.hpp"
#include <unordered_map>
#include <limits>
#include <deque>
namespace nfd {
namespace fw {
class MIRCCQueue
{
public:
std:: deque<Interest> qp,qs;
//记录的是过期时间,非达到时间
std:: deque<time::steady_clock::TimePoint> tp,ts;
};
/*
MIRCC 策略实现
*/
class MIRCCStrategy : public Strategy, public ProcessNackTraits<MIRCCStrategy>
{
public:
explicit MIRCCStrategy(Forwarder &forwarder, const Name &name = getStrategyName());
~MIRCCStrategy() override = default;
void
afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest,const shared_ptr<pit::Entry> &pitEntry) override;
void
afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack,const shared_ptr<pit::Entry> &pitEntry) override;
void
beforeSatisfyInterest(const shared_ptr<pit::Entry> &pitEntry,const FaceEndpoint &ingress, const Data &data) override;
void
afterReceiveData(const shared_ptr<pit::Entry> &pitEntry,const FaceEndpoint &ingress, const Data &data) override;
/**
* 获取最大的目标速率
* @return
*/
static uint64_t
getMaxR()
{
return std::numeric_limits<uint32_t>::max();
}
static const Name &
getStrategyName();
protected:
friend ProcessNackTraits<DSCCPStrategy>;
private:
unordered_map<FaceEndpoint,uint64_t> m_Rp_base;//Rp的base速率
unordered_map<FaceEndpoint,uint64_t> m_Rp_ex;//Rp的ex速率
unordered_map<FaceEndpoint,uint64_t> m_Rs_base;//Rs的base速率
unordered_map<FaceEndpoint,uint64_t> m_Rs_ex;//Rs的ex速率
unordered_map<FaceEndpoint,uint64_t> m_Yp;//本周期的Yp(t)
unordered_map<FaceEndpoint,uint64_t> m_Yp_old;//上一个周期的Yp(t)
unordered_map<FaceEndpoint,uint64_t> m_Ys;//本周起的Ys(t)
unordered_map<FaceEndpoint,uint64_t> m_Ys_old;//上一个周期的Ys(t)
unordered_map<FaceEndpoint,uint64_t> m_d;//维护每一个face的RTT
unorderer_map<FaceEndpoint,MIRCCQueue> m_Queue;//用于出口排队转发兴趣包
unordered_map<FaceEndpoint,uint64_t> m_ratio;//维护每一个face的Data/Interest
//这个大小之比要按照第一类,第二类进行分类么?
unordered_map<FaceEndpoint,uint64_t> m_linkC;//每一个链路的带宽,目前不知道怎么获取,都设置为10 * 1000 * 1000 * 8
//通过兴趣包标注的path来获得对应的转发端口
//采用MIRCC论文中提出的第二个方案,由consumer随机生成hash值来引导转发,路由器无需在记录对应关系
//unorderer_map<unint64_t,FaceEndPoint> pathid2face;
//兴趣包大小常量
const unint64_t INTEREST_SIZE = 300;
//队列大小常量
const unint64_t QUEUE_MAX = 1000;
const double m_B = 0.1,m_a = 0.5,m_miu = 0.95,m_ratio_a = 0.99 ,m_rtt_a = 0.99;
const uint64_t = hashUnit[100],hashMo = 998244353,hashNum = 37,invMod = 0;
//发送对应端口1个正在排队的兴趣包,并设置下一次发送时间
void
sendQueueInterest(FaceEndpoint &ingress);
//清空对应端口队列中所有过期的兴趣包
void
refreshQueueInterest(FaceEndpoint &ingress);
//将兴趣包放入对应端口的队列
void
pushQueueInterest(FaceEndpoint &ingress, const Interest &interest);
//计算并返回q(t)
uint64_t
getQt();
//计算Rt
void
calcRt(FaceEndpoint &ingress);
//更新Yt
void
updateYt(FaceEndpoint &ingress, const Interest &interest);
void
setHashForData(const FaceEndpoint &ingress, const Data &data);
uint64_t
fastpow(int64_t a,int64_t k,int64_t p);
};
}
}