Files
mircc/scenario/extensions/mircc-strategy.cpp
T
2022-07-01 06:12:20 -07:00

537 lines
23 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "mircc-strategy.hpp"
NFD_LOG_INIT(MIRCCStrategy);
namespace nfd {
namespace fw {
MIRCCStrategy::MIRCCStrategy(Forwarder &forwarder, const ndn::Name &name)
: Strategy(forwarder), ProcessNackTraits<MIRCCStrategy>(this)
{
this->setInstanceName(makeInstanceName(name, getStrategyName()));
time::nanoseconds ttime(1);
getScheduler().schedule(ttime, bind(&MIRCCStrategy::mirccinit, this));
}
const Name&
MIRCCStrategy::getStrategyName()
{
static Name strategyName("/localhost/nfd/strategy/mircc/%FD%01");
return strategyName;
}
void
MIRCCStrategy::mirccinit()
{
const FaceTable &faceTable = getFaceTable();
for (auto& tmp : faceTable)
{
//非外部端口跳过
if (tmp.getId() < 257)
continue;
m_linkC[inface(tmp.getId())] = 1 * 1000 * 1000;//初始化均为1Mb/s
m_Rp_base[inface(tmp.getId())] = m_linkC[inface(tmp.getId())];
m_Rp_ex[inface(tmp.getId())] = 0;
m_Rs_base[inface(tmp.getId())] = 0;
m_Rs_ex[inface(tmp.getId())] = 0;
m_Yp[inface(tmp.getId())] = 0;
m_Yp_old[inface(tmp.getId())] = 0;
m_Ys[inface(tmp.getId())] = 0;
m_Ys_old[inface(tmp.getId())] = 0;
m_d[inface(tmp.getId())] = 0.03;
m_running[inface(tmp.getId())] = 0;
//sendQueueInterest(tmp.getId());
calcRt(tmp.getId());
}
}
void
MIRCCStrategy::sendQueueInterest(FaceId mface)
{
//设置下一次发送,根据兴趣包与数据包的大小之比,计算出兴趣包可用带宽,进而计算出每发送一个兴趣包需要的时间,最小为1ms
if (m_Queue[inface(mface)].qp.size() + m_Queue[inface(mface)].qs.size() > 1)
{//
time::nanoseconds ttime(std::max((uint64_t)1,(uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)]));
getScheduler().schedule(ttime, bind(&MIRCCStrategy::sendQueueInterest, this, mface));
}else
{
m_running[inface(mface)] = 0;
}
//std::cout << (uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)] << std::endl;
//分三种情况讨论,只有第一类兴趣包可以发,只有第二类兴趣包可以发,两类可以发(选择最早到达的)
if (m_Queue[inface(mface)].qp.size() > 0 and m_Queue[inface(mface)].qs.size() == 0)
{
shared_ptr<Interest> interest = m_Queue[inface(mface)].qp.front();
sendInterest(m_Queue[inface(mface)].pp.front(),FaceEndpoint(*getFaceTable().get(mface),0),*interest);
m_Queue[inface(mface)].qp.pop_front();
m_Queue[inface(mface)].tp.pop_front();
m_Queue[inface(mface)].pp.pop_front();
m_on++;
}else if (m_Queue[inface(mface)].qs.size() > 0 and m_Queue[inface(mface)].qp.size() == 0)
{
shared_ptr<Interest> interest = m_Queue[inface(mface)].qs.front();
sendInterest(m_Queue[inface(mface)].ps.front(),FaceEndpoint(*getFaceTable().get(mface),0),*interest);
m_Queue[inface(mface)].qs.pop_front();
m_Queue[inface(mface)].ts.pop_front();
m_Queue[inface(mface)].ps.pop_front();
m_on++;
}else if (m_Queue[inface(mface)].qs.size() > 0 and m_Queue[inface(mface)].qp.size() > 0)
{
m_on++;
if (m_Queue[inface(mface)].tp.front() < m_Queue[inface(mface)].ts.front())
{
shared_ptr<Interest> interest =m_Queue[inface(mface)].qp.front();
sendInterest(m_Queue[inface(mface)].pp.front(),FaceEndpoint(*getFaceTable().get(mface),0),*interest);
m_Queue[inface(mface)].qp.pop_front();
m_Queue[inface(mface)].tp.pop_front();
m_Queue[inface(mface)].pp.pop_front();
}else
{
shared_ptr<Interest> interest = m_Queue[inface(mface)].qs.front();
sendInterest(m_Queue[inface(mface)].ps.front(),FaceEndpoint(*getFaceTable().get(mface),0),*interest);
m_Queue[inface(mface)].qs.pop_front();
m_Queue[inface(mface)].ts.pop_front();
m_Queue[inface(mface)].ps.pop_front();
}
}else
{
m_miss++;
}
/*
//设置下一次发送,根据兴趣包与数据包的大小之比,计算出兴趣包可用带宽,进而计算出每发送一个兴趣包需要的时间,最小为1ms
time::nanoseconds ttime(std::max((uint64_t)1,(uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)]));
getScheduler().schedule(ttime, bind(&MIRCCStrategy::sendQueueInterest, this, mface));
shared_ptr<Interest> fi,si;
time::steady_clock::TimePoint ft,st;
shared_ptr<pit::Entry> fp,sp;
m_Queue[inface(mface)].f.getHead(fi,ft,fp);
m_Queue[inface(mface)].s.getHead(si,st,sp);
//分三种情况讨论,只有第一类兴趣包可以发,只有第二类兴趣包可以发,两类可以发(选择最早到达的)
if (m_Queue[inface(mface)].f.getSize() > 0 and m_Queue[inface(mface)].s.getSize() == 0)
{
sendInterest(fp,FaceEndpoint(*getFaceTable().get(mface),0),*fi);
m_Queue[inface(mface)].f.deQueueFront();
}else if (m_Queue[inface(mface)].s.getSize() > 0 and m_Queue[inface(mface)].f.getSize() == 0)
{
sendInterest(sp,FaceEndpoint(*getFaceTable().get(mface),0),*si);
m_Queue[inface(mface)].s.deQueueFront();
}else if (m_Queue[inface(mface)].f.getSize() > 0 and m_Queue[inface(mface)].s.getSize() > 0)
{
if (ft < st)
{
sendInterest(fp,FaceEndpoint(*getFaceTable().get(mface),0),*fi);
m_Queue[inface(mface)].f.deQueueFront();
}else
{
sendInterest(sp,FaceEndpoint(*getFaceTable().get(mface),0),*si);
m_Queue[inface(mface)].s.deQueueFront();
}
}else
{
}*/
}
void
MIRCCStrategy::calcRt(FaceId mface)
{
//每100ms自动重新计算Rt
time::nanoseconds ttime(100000000);
getScheduler().schedule(ttime, std::bind(&MIRCCStrategy::calcRt, this, mface));
// std::cout << (double)m_miss / std::max(m_on,(uint64_t)1) << std::endl;
long long ia,ib,Rp,Rs;
double db;
Rp = m_Rp_base[inface(mface)] + m_Rp_ex[inface(mface)];
Rs = m_Rs_base[inface(mface)] + m_Rs_ex[inface(mface)];
// NS_LOG_DEBUG("face:" << mface << "d:" << m_d[inface(mface)]);
// NS_LOG_DEBUG("Rp: " << Rp << " Rs: " << Rs << " Yt: " << m_Yp[inface(mface)] << " C:" << m_linkC[inface(mface)]);
//Rp计算过程
ia = std::max(m_linkC[inface(mface)],m_Yp[inface(mface)]);
ib = std::max(Rp,1ll);
double Np = std::max((double)ia / ib,1.0);
// NS_LOG_DEBUG(" Np: " << Np);
double Bp;
if (m_Yp[inface(mface)] >= m_Yp_old[inface(mface)])
{
ia = m_Yp[inface(mface)] - m_Yp_old[inface(mface)];
ib = std::max(m_Yp[inface(mface)],(uint64_t)1);
Bp = std::max(m_B,(double)ia / ib);
}else
{
Bp = m_B;
}
// NS_LOG_DEBUG(" Bp: " << Bp);
ia = m_linkC[inface(mface)] - Bp * getQt(mface) / std::max((double)m_d[inface(mface)],(double)0.001);
// NS_LOG_DEBUG("Qt "<< getQt(mface) << " ia " << ia);
db = std::max(Np,(double)1);
long long Rp_base_n = std::min(std::max(ia / db,0.0),m_linkC[inface(mface)] / Np);
// NS_LOG_DEBUG(" Ppbn:" << Rp_base_n);
m_Rp_base[inface(mface)] = m_a * m_Rp_base[inface(mface)] + (1 - m_a) * Rp_base_n;
// NS_LOG_DEBUG(" Rpb: " << m_Rp_base[inface(mface)]);
ia = m_Yp[inface(mface)];
db = std::max(Np,(double)1);
long long Rp_ex_n = std::min(std::max((uint64_t)0,(uint64_t)(Rp - ia / db)),(uint64_t)(m_linkC[inface(mface)] - Rp_base_n));
m_Rp_ex[inface(mface)] = m_a * m_Rp_ex[inface(mface)] + (1 - m_a) * Rp_ex_n;
// NS_LOG_DEBUG(" Rpe: " << m_Rp_ex[inface(mface)]);
// NS_LOG_DEBUG(" !Ppne:" << Rp_ex_n << " " << (m_linkC[inface(mface)] / Np - Rp_base_n));
m_Yp_old[inface(mface)] = m_Yp[inface(mface)];
//Rs计算过程
uint64_t Cs = std::max(m_linkC[inface(mface)] - m_Yp[inface(mface)] ,(uint64_t)0);
// NS_LOG_DEBUG(" Cs: " << Cs << " Ys" << m_Ys[inface(mface)]);
//TODO: R(t-T) Rs? Rs+Rp?
double Ns = std::max((double)std::max(Cs,m_Ys[inface(mface)]) / std::max(Rp + Rs,1ll) ,1.0);
// NS_LOG_DEBUG(" Ns: " << Ns);
ia = m_Ys[inface(mface)] - m_Ys_old[inface(mface)];
ib = std::max(m_Ys[inface(mface)],(uint64_t)1);
double Bs = std::max(m_B,(double)ia / ib);
ia = Cs - Bs * getQt(mface) / std::max(m_d[inface(mface)],(double)0.001);
// NS_LOG_DEBUG(" ia: " << ia);
db = Ns;
uint64_t Rs_base_n = std::min((uint64_t)std::max(0.0,ia / db),uint64_t(Cs / Ns));
m_Rs_base[inface(mface)] = m_a * m_Rs_base[inface(mface)] + (1 - m_a) * Rs_base_n;
// NS_LOG_DEBUG(" Rsb: " << m_Rs_base[inface(mface)]);
ia = m_Ys[inface(mface)];
db = Ns;
//TODO:: excess rate < 0 ? >= 0 ?
uint64_t Rs_ex_n = std::min((uint64_t)std::max(0.0,Rs - ia / db),uint64_t(Cs - Rs_base_n));
m_Rs_ex[inface(mface)] = m_a * m_Rs_ex[inface(mface)] + (1 - m_a) * Rs_ex_n;
// NS_LOG_DEBUG(" Rse: " << m_Rs_ex[inface(mface)]);
m_Ys_old[inface(mface)] = m_Ys[inface(mface)];
m_Yp[inface(mface)] = 0;
m_Ys[inface(mface)] = 0;
}
uint64_t
MIRCCStrategy::getQt(FaceId mface)
{
//清除队列中过期兴趣包,避免造成干扰
//refreshQueueInterest(mface);
//返回兴趣包个数*兴趣包大小*数据包与兴趣包大小之比
uint64_t time = 10;//保持Rt计算周期一致
return (m_Queue[inface(mface)].qp.size() + m_Queue[inface(mface)].qs.size()) * DATA_SIZE * time;
//return (m_Queue[inface(mface)].f.getSize() + m_Queue[inface(mface)].s.getSize()) * DATA_SIZE * time;
}
void
MIRCCStrategy::pushQueueInterest(FaceId mface,const shared_ptr<Interest>& interest,const shared_ptr<pit::Entry>& pitEntry)
{
//// NS_LOG_DEBUG("MIRCC pushQueueInterest");
uint64_t p = *(interest->getTag<lp::PTag>());
//如果是第一类兴趣包
if (p == 1)
{
//如果队列已经满了
if (m_Queue[inface(mface)].qp.size() + m_Queue[inface(mface)].qs.size() >= MAX_QUEUE)
{
//如果存在第二类兴趣包可以删除
if (m_Queue[inface(mface)].qs.size() > 0)
{
//删掉第二类兴趣包最后一个,把当前第一类兴趣包加入
m_Queue[inface(mface)].qs.pop_back();
m_Queue[inface(mface)].ts.pop_back();
m_Queue[inface(mface)].ps.pop_back();
m_Queue[inface(mface)].qp.push_back(interest);
m_Queue[inface(mface)].tp.push_back(time::steady_clock::now());
m_Queue[inface(mface)].pp.push_back(pitEntry);
if (m_running[inface(mface)] == 0)
{
m_running[inface(mface)] = 1;
time::nanoseconds ttime(std::max((uint64_t)1,(uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)]));
getScheduler().schedule(ttime, bind(&MIRCCStrategy::sendQueueInterest, this, mface));
}
}else
{
//否则当前兴趣包丢失
//这里应该回传Nack,还没有写完
this->rejectPendingInterest(pitEntry);
}
}else
{
//直接将当前兴趣包加入第一类兴趣包队列末尾
m_Queue[inface(mface)].qp.push_back(interest);
m_Queue[inface(mface)].tp.push_back(time::steady_clock::now());
m_Queue[inface(mface)].pp.push_back(pitEntry);
if (m_running[inface(mface)] == 0)
{
m_running[inface(mface)] = 1;
time::nanoseconds ttime(std::max((uint64_t)1,(uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)]));
getScheduler().schedule(ttime, bind(&MIRCCStrategy::sendQueueInterest, this, mface));
}
}
}else
{
//如果队列已经满了
if (m_Queue[inface(mface)].qp.size() + m_Queue[inface(mface)].qs.size() >= MAX_QUEUE)
{
//当前兴趣包丢失
this->rejectPendingInterest(pitEntry);
//这里是否应该回传Nack不确定,需要回去看论文,第二类兴趣包处理方式似乎与第一类不同,还没有写完
}else
{
//将但钱兴趣包加入第二类末尾
m_Queue[inface(mface)].qs.push_back(interest);
m_Queue[inface(mface)].ts.push_back(time::steady_clock::now());
m_Queue[inface(mface)].ps.push_back(pitEntry);
if (m_running[inface(mface)] == 0)
{
m_running[inface(mface)] = 1;
time::nanoseconds ttime(std::max((uint64_t)1,(uint64_t)1000000000 * DATA_SIZE / m_linkC[inface(mface)]));
getScheduler().schedule(ttime, bind(&MIRCCStrategy::sendQueueInterest, this, mface));
}
}
}
/*
uint64_t p = *(interest->getTag<lp::PTag>());
//如果是第一类兴趣包
if (p == 1)
{
//如果队列已经满了
if (m_Queue[inface(mface)].f.getSize() + m_Queue[inface(mface)].s.getSize() >= 10)
{
//如果存在第二类兴趣包可以删除
if (m_Queue[inface(mface)].s.getSize() > 0)
{
//删掉第二类兴趣包最后一个,把当前第一类兴趣包加入
m_Queue[inface(mface)].s.deQueueBack();
m_Queue[inface(mface)].f.enQueue(interest,time::steady_clock::now(),pitEntry);
}else
{
//否则当前兴趣包丢失
//这里应该回传Nack,还没有写完
this->rejectPendingInterest(pitEntry);
}
}else
{
//直接将当前兴趣包加入第一类兴趣包队列末尾
m_Queue[inface(mface)].f.enQueue(interest,time::steady_clock::now(),pitEntry);
}
}else
{
//如果队列已经满了
if (m_Queue[inface(mface)].f.getSize() + m_Queue[inface(mface)].s.getSize() >= 10)
{
//当前兴趣包丢失
this->rejectPendingInterest(pitEntry);
//这里是否应该回传Nack不确定,需要回去看论文,第二类兴趣包处理方式似乎与第一类不同,还没有写完
}else
{
//将但钱兴趣包加入第二类末尾
m_Queue[inface(mface)].s.enQueue(interest,time::steady_clock::now(),pitEntry);
}
}*/
}
void
MIRCCStrategy::afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest, const shared_ptr<pit::Entry> &pitEntry)
{//目前在按照标识路径转发,采用MIRCC论文中的方法2
//// NS_LOG_DEBUG("MIRCC afterReceiveInterest");
if (hasPendingOutRecords(*pitEntry))
{
//已经有记录,等待即可。
return;
}
//避免巡回转发
const fib::Entry& fibEntry = this->lookupFib(*pitEntry);
fib::NextHopList outFaces = fibEntry.getNextHops();
for (auto it = outFaces.begin();it != outFaces.end();it++)
{
if (it->getFace().getId() == ingress.face.getId())
{
outFaces.erase(it);
break;
}
}
if (outFaces.size() != 0)
{
//根据hash值选择端口
uint64_t hashGuide = *interest.getTag<lp::PathidTag>();
uint64_t Facenum = hashGuide % outFaces.size();
hashGuide = hashGuide / outFaces.size();
interest.setTag(make_shared<lp::PathidTag>(hashGuide));
Face& outFace = outFaces[Facenum].getFace();
//更新Yt
uint64_t p = *interest.getTag<lp::PTag>();
uint64_t time = 10;//保持Rt计算周期一致
if (p == 1)
{
m_Yp[inface(outFace.getId())] += DATA_SIZE * time;
}else
{
m_Ys[inface(outFace.getId())] += DATA_SIZE * time;
}
//放入对应端口队列进行排队
this->pushQueueInterest(outFace.getId(), const_cast<Interest&>(interest).shared_from_this(), pitEntry);
return;
}
//如果发现没有端口可以转发,则设置立马删除该条pit
lp::NackHeader nackHeader;
nackHeader.setReason(lp::NackReason::NO_ROUTE);
this->sendNack(pitEntry, ingress, nackHeader);
this->rejectPendingInterest(pitEntry);
return;
}
void
MIRCCStrategy::afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack, const shared_ptr<pit::Entry> &pitEntry)
{
//// NS_LOG_DEBUG("MIRCC afterReceiveNack");
//nack无需处理
}
void
MIRCCStrategy::beforeSatisfyInterest(const shared_ptr<pit::Entry> &pitEntry, const FaceEndpoint &ingress, const Data &data)
{
//// NS_LOG_DEBUG("MIRCC beforeSatisfyInterest");
auto now = time::steady_clock::now();
//目前计算中,只考虑没有过期的并成功收到数据包的兴趣包对RTT和ratio的影响.超时的数据包是否对RTT有影响?。
for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
if (inRecord.getExpiry() > now)
{
if (inRecord.getFace().getId() == ingress.face.getId() && inRecord.getFace().getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC)
{
continue;
}
//计算收到数据包对RTT的影响。
//RTT暂时为固定值50ms
auto dura = now - inRecord.getLastRenewed();
// NS_LOG_DEBUG("RTT:" << dura.count());
m_d[inface(ingress.face.getId())] = m_rtt_a * m_d[inface(ingress.face.getId())] + (1.0 - m_rtt_a) * ((double)dura.count() / 1000000000);
}
}
}
void
MIRCCStrategy::afterReceiveData(const shared_ptr<pit::Entry> &pitEntry, const FaceEndpoint &ingress, const Data &data)
{
//// NS_LOG_DEBUG("MIRCC afterReceiveData");
if (ingress.face.getId() >= 257)
{
//更新data的Pathid
auto oldPathTag = data.getTag<lp::PathidTag>();
uint64_t hashVal;
if (oldPathTag != nullptr)
{
hashVal = *oldPathTag;
}else
{
hashVal = 0;
}
hashVal = (hashVal * 10 + (ingress.face.getId() - 257));
//给data设置新的hash值
data.setTag(make_shared<lp::PathidTag>(hashVal));
auto FaceRateTag = data.getTag<lp::PTag>();
if (FaceRateTag != nullptr)
{
m_linkC[inface(ingress.face.getId())] = m_miu * (*FaceRateTag);
}else
{
//认定为app线路,速率设置为100MB/s
m_linkC[inface(ingress.face.getId())] = m_miu * 8 * 100 * 1000 * 1000;
}
//修改Rt的值
auto oldRpTag = data.getTag<lp::RpTag>();
if (oldRpTag != nullptr)
{
uint64_t dataRp = *oldRpTag;
if (dataRp > m_Rp_base[inface(ingress.face.getId())] + m_Rp_ex[inface(ingress.face.getId())])
{
data.setTag(make_shared<lp::RpTag>(m_Rp_base[inface(ingress.face.getId())] + m_Rp_ex[inface(ingress.face.getId())]));
}
}else
{
data.setTag(make_shared<lp::RpTag>(m_Rp_base[inface(ingress.face.getId())] + m_Rp_ex[inface(ingress.face.getId())]));
}
//修改Rs的值
auto oldRsTag = data.getTag<lp::RsTag>();
if (oldRsTag != nullptr)
{
uint64_t dataRs =*oldRsTag;
if (dataRs > m_Rs_base[inface(ingress.face.getId())] + m_Rs_ex[inface(ingress.face.getId())])
{
data.setTag(make_shared<lp::RsTag>(m_Rs_base[inface(ingress.face.getId())] + m_Rs_ex[inface(ingress.face.getId())]));
}
}else
{
data.setTag(make_shared<lp::RsTag>(m_Rs_base[inface(ingress.face.getId())] + m_Rs_ex[inface(ingress.face.getId())]));
}
}
beforeSatisfyInterest(pitEntry, ingress, data);
sendDataToAll(pitEntry, ingress, data);
}
}
}