diff --git a/scenario/extensions/mircc-strategy.cpp b/scenario/extensions/mircc-strategy.cpp new file mode 100644 index 0000000..bced3a6 --- /dev/null +++ b/scenario/extensions/mircc-strategy.cpp @@ -0,0 +1,376 @@ +#include "mircc-strategy.hpp" + +namespace nfd { + namespace fw { + + MIRCCStrategy::MIRCCStrategy(Forwarder &forwarder, const Name &name = getStrategyName()) + : Strategy(forwarder), ProcessNackTraits(this) + { + this->setInstanceName(makeInstanceName(name, getStrategyName())); + + FaceTable& faceTable = m_forwarder.m_faceTable; + for (auto i = faceTable.begin();i != faceTable.end();i++) + { + m_Rp_base[i] = getMaxR(); + m_Rp_ex[i] = 0; + m_Rs_base[i] = 0; + m_Rs_ex[i] = 0; + + m_Yp[i] = 0; + m_Yp_old[i] = 0; + m_Ys[i] = 0; + m_Ys_old[i] = 0; + + m_d[i] = 500; + m_ratio[i] = 3000/INTEREST_SIZE; + m_linkC[i] = 10 * 1000 * 1000 * 8; + sendInterest(i); + calcRt(i); + } + + //初始化hash + hashUnit[0] = 1; + for (int i = 1;i < 100;i++) + { + hashUnit[i] = hashUnit[i - 1] * hashNum % hashMo; + } + //初始化逆元 + invMod = fpow(hashNum,hashMo - 2,hashMo); + + + } + + void + MIRCCStrategy::sendQueueInterest(FaceEndpoint &ingress) + { + //设置下一次发送,根据兴趣包与数据包的大小之比,计算出兴趣包可用带宽,进而计算出每发送一个兴趣包需要的时间,最小为1ms + getScheduler().schedule(time::milliseconds(max(1,INTEREST_SIZE / (m_linkC[ingress] * (1.0 / (m_ratio[ingress] + 1))))), std::bind(&MIRCCStrategy::sendQueueInterest, this, ingress)); + + //清空队列中过期的兴趣包 + refreshQueueInterest(ingress); + + //分三种情况讨论,只有第一类兴趣包可以发,只有第二类兴趣包可以发,两类可以发(选择最早到达的) + if (m_Queue[ingress].qp.size() > 0 and m_Queue[ingress].qs.size() == 0) + { + //发送兴趣包 + sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qp.front()); + //这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能 + m_Queue[ingress].qp.pop_front(); + m_Queue[ingress].tp.pop_front(); + + }else if (m_Queue[ingress].qs.size() > 0 and m_Queue[ingress].qp.size() == 0) + { + //发送兴趣包 + sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qs.front()); + //这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能 + m_Queue[ingress].qs.pop_front(); + m_Queue[ingress].ts.pop_front(); + }else + { + //如果第一类兴趣包更早到达 + if (m_Queue[ingress].tp.front() < m_Queue[ingress].ts.front()) + { + //发送兴趣包 + sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qp.front()); + //这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能 + m_Queue[ingress].qp.pop_front(); + m_Queue[ingress].tp.pop_front(); + }else + { + //发送兴趣包 + sendInterest(m_forwarder.m_pit,ingress,m_Queue[ingress].qs.front()); + //这个位置是不是要设置pit记录的过期时间?似乎sendInterest里面实现了这个功能 + m_Queue[ingress].qs.pop_front(); + m_Queue[ingress].ts.pop_front(); + } + } + } + + + void + MIRCCStrategy::calcRt(FaceEndpoint &ingress) + { + //每100ms自动重新计算Rt + getScheduler().schedule(time::milliseconds(100), std::bind(&MIRCCStrategy::calcRt, this, ingress)); + + uint64_t ia,ib,Rp,Rs; + Rp = m_Rp_base[ingress] + m_Rp_ex[ingress]; + Rs = m_Rs_base[ingress] + m_Rs_ex[ingress]; + + //Rp计算过程 + ia = std::max(m_linkC[ingress],m_Yp[ingress]); + ib = std::max(Rp,1); + uint64_t Np = ia / ib; + + ia = m_Yp[ingress] - m_Yp_old[ingress]; + ib = std::max(m_Yp[ingress],1); + double Bp = std::max(m_B,(double)ia / ib); + + ia = m_miu * m_linkC[ingress] - Bp * getQt() / std::max(m_d[ingress],0.001); + ib = std::max(Np,1); + m_Rp_base[ingress] = m_a * m_Rp_base[ingress] + (1 - m_a) * ia / ib; + + ia = m_Yp[ingress]; + ib = std::max(Np,1); + m_Rp_ex[ingress] = m_a * m_Rp_ex[ingress] + (1 - m_a) * (Rp - ia / ib); + + m_Yp_old[ingress] = m_Yp[ingress]; + + //Rs计算过程 + uint64_t Cs = std::max(m_linkC[ingress] - m_Yp[ingress],0); + uint64_t Ns = std::max(max(Cs,m_Ys[ingress]) / Rs,1); + + ia = m_Ys[ingress] - m_Ys_old[ingress]; + ib = std::max(m_Ys[ingress],1); + double Bs = std::max(m_B,(double)ia / ib); + + ia = m_miu * Cs - Bs * getQt() / std::max(m_d[ingress],0.001); + ib = std::max(Ns,1); + m_Rs_base[ingress] = m_a * m_Rs_base[ingress] + (1 - m_a) * ia / i b; + + ia = m_Ys[ingress]; + ib = std;:max(Ns,1); + m_Rs_ex[ingress] = m_a * m_Rs_ex[ingress] + (1 - m_a) * (Rs - ia / ib); + + m_Ys_old[ingress] = m_Ys[ingress]; + + m_Yp[ingress] = 0; + m_Ys[ingress] = 0; + + + } +//在队列大小限制范围内的兴趣包,转发过程不会超时,这个功能暂时没用。 +/* + void + MIRCCStrategy::refreshQueueInterest(FaceEndpoint &ingress) + { + //清空face队列中过期的兴趣包 + auto now = time::steady_clock::now(); + + while (m_Queue[ingress].qp.size() > 0 and m_Queue[ingress].tp.front() < now) + { + m_Queue[ingress].qp.pop_front(); + m_Queue[ingress].tp.pop_front(); + } + + while (m_Queue[ingress].qs.size() > 0 and m_Queue[ingress].ts.front() < now) + { + m_Queue[ingress].qs.pop_front(); + m_Queue[ingress].ts.pop_front(); + } + } +*/ + + uint64_t + MIRCCStrategy::getQt(FaceEndpoint &ingress) + { + //清除队列中过期兴趣包,避免造成干扰 + //refreshQueueInterest(ingress); + //返回兴趣包个数*兴趣包大小*数据包与兴趣包大小之比 + return (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size()) * INTEREST_SIZE * m_ratio[ingress]; + } + + void + MIRCCStrategy::pushQueueInterest(FaceEndpoint &ingress, const Interest &interest) + { + //更新Yt + updateYt(ingress,interest); + + uint64_t p = interest.getTag(); + //如果是第一类兴趣包 + if (p == 1) + { + //如果队列已经满了 + if (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size() >= QUEUE_MAX) + { + //如果存在第二类兴趣包可以删除 + if (m_Queue[ingress].qs.size() > 0) + { + //删掉第二类兴趣包最后一个,把当前第一类兴趣包加入 + m_Queue[ingress].qs.pop_back(); + m_Queue[ingress].ts.pop_back(); + m_Queue[ingress].qp.push_back(interest); + m_Queue[ingress].tp.push_back(time::steady_clock::now()); + }else + { + //否则当前兴趣包丢失 + + //这里应该回传Nack,还没有写完 + } + }else + { + //直接将当前兴趣包加入第一类兴趣包队列末尾 + m_Queue[ingress].qp.push_back(interest); + m_Queue[ingress].tp.push_back(time::steady_clock::now()); + } + }else + { + //如果队列已经满了 + if (m_Queue[ingress].qp.size() + m_Queue[ingress].qs.size() >= QUEUE_MAX) + { + //当前兴趣包丢失 + + //这里是否应该回传Nack不确定,需要回去看论文,第二类兴趣包处理方式似乎与第一类不同,还没有写完 + }else + { + //将但钱兴趣包加入第二类末尾 + m_Queue[ingress].qs.push_back(interest); + m_Queue[ingress].ts.push_back(time::steady_clock::now()); + } + } + } + + void + MIRCCStrategy::updateYt(FaceEndpoint &ingress, const Interest &interest) + { + uint64_t p = interest.getTag(); + if (p == 1) + { + m_Yp[ingress] += INTEREST_SIZE * m_ratio[ingress]; + }else + { + m_Ys[ingress] += INTEREST_SIZE * m_ratio[ingress]; + } + } + + void + MIRCCStrategy::afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest, const shared_ptr &pitEntry) + {//目前在按照标识路径转发,采用MIRCC论文中的方法2 + + + //MIRCC似乎不需要判断isNextHopEligible??? + //稍微有点不理解Out-record在干啥 + if (hasPendingOutRecords(*pitEntry)) + { + //已经有记录,等待即可。 + //这个位置要不要在其他路径继续转发(比如pathid不相同的情况),不太确定 + //我记得原文是说不考虑任何的中间的cs命中之类的情况 + return; + } + + //根据hash值获取一个参考端口号,还需要根据可用端口数量进一步取模来最终选取转发端口。 + uint64_t tmpFace = chooseFaceForInterestByHash(interest); + + const fib::Entry& fibEntry = this->lookupFib(*pitEntry); + + NextHopList outFaces = fibEntry.getNextHops(); + + if (outFace.size() != 0) + {//如果存在端口可以转发 + uint64_t choosedFace = tmpFace % outFace.size(); + Face& outFace = choosedFace[choosedFace].getFace(); + //放入对应端口队列进行排队 + this->pushQueueInterest(pitEntry, FaceEndpoint(outFace, 0), interest); + return; + } + + //如果发现没有端口可以转发,则设置立马删除该条pit + //后续Nack处理是否得当并不清楚,还需要研究 + lp::NackHeader nackHeader; + nackHeader.setReason(lp::NackReason::NO_ROUTE); + this->sendNack(pitEntry, ingress, nackHeader); + this->rejectPendingInterest(pitEntry); + return; + } + + void + MIRCCStrategy::afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack, const shared_ptr &pitEntry) + { + //nack具体怎么处理还需要研究 + //nack应该利用预留给data的带宽,返回给consumer + //Forwarder::onIncomingNack似乎已经把Nack返回给consumer了 + } + + void + MIRCCStrategy::beforeSatisfyInterest(const shared_ptr &pitEntry, const FaceEndpoint &ingress, const Data &data) + { + auto now = time::steady_clock::now(); + uint64_t rcd_cnt = 0; + //目前计算中,只考虑没有过期的并成功收到数据包的兴趣包对RTT和ratio的影响,超时的数据包是否对RTT有影响?。 + for (const pit::InRecord& inRecord : pitEntry->getInRecords()) { + if (inRecord.getExpiry() > now) + { + if (inRecord.getFace().getId() == ingress.face.getId() && inRecord.getFace().getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) + { + continue; + } + + //计算收到数据包对数据包/兴趣包大小比率的影响 + m_ratio[ingress] = m_ratio_a * m_ratio[ingress] + (1 - m_ratio_a) * (data.getContent().size() / INTEREST_SIZE); + + //计算收到数据包对RTT的影响。 + uint64_t tmpRTT = time::steady_clock::now() - inRecord.getLastRenewed();//不确定正确性,可能还要继续考虑 + m_d[ingress] = m_rtt_a * m_d[ingress] + (1 - m_rtt_a) * tmpRTT; + + } + } + } + + + void + MIRCCStrategy::fastpow(int64_t a,int64_t k,int64_t p) + { + int64_t = 1; + while (k != 0) + { + if (k % 2 != 0) + { + res = res * a % p; + } + k >>= 1; + a = a * a % p; + } + return res; + } + + void + MIRCCStrategy::setHashForData(const FaceEndpoint &ingress, const Data &data) + {//注意!!HopCountTag的工作情况目前不太了解,似乎ndnsim这个Tag没做处理,后续需要考虑 + //处理路径标识id + uint64_t hashOld = data.getTag(); + //uint64_t hashNew = (hashOld + hashUnit[data.getTag()] * ingress.getFace().getId() % hashMo) % hashMo; + //靠近producer的为hash值的高位 + uint64_t hashNew = (hashOld * hashNum % hashMo + ingress.getFace().getId() % hashMo) % hashMo; + //给data设置新的hash值 + data.setTag(make_shared(hashNew)); + + + } + + uint64_t + MIRCCStrategy::chooseFaceForInterestByHash(const Interest &interest) + { + uint64_t hashGuide = data.getTag(); + uint64_t Faceid = hashGuide % hashMo % hashNum; + return Faceid; + } + + void + MIRCCStrategy::afterReceiveData(const shared_ptr &pitEntry, const FaceEndpoint &ingress, const Data &data) + { + setHashForData(ingress, data) + + //修改Rt的值 + uint64_t dataRp = data.getTag(); + uint64_t dataRs = data.getTag(); + + if (dataRp > m_Rp_base[ingress] + m_Rp_ex[ingress]) + { + data.setTag(make_shared(m_Rp_base[ingress] + m_Rp_ex[ingress])); + } + + if (dataRs > m_Rs_base[ingress] + m_Rs_ex[ingress]) + { + data.setTag(make_shared(m_Rs_base[ingress] + m_Rs_ex[ingress])); + } + + + + beforeSatisfyInterest(pitEntry, ingress, data); + + sendDataToAll(pitEntry, ingress, data); + + } + + } +} \ No newline at end of file diff --git a/scenario/extensions/mircc-strategy.hpp b/scenario/extensions/mircc-strategy.hpp new file mode 100644 index 0000000..7e76b02 --- /dev/null +++ b/scenario/extensions/mircc-strategy.hpp @@ -0,0 +1,124 @@ + +#include "face/face.hpp" +#include "fw/strategy.hpp" +#include "fw/algorithm.hpp" +#include "fw/process-nack-traits.hpp" +#include +#include +#include +namespace nfd { + namespace fw { + + + class MIRCCQueue + { + public: + std:: deque qp,qs; + //记录的是过期时间,非达到时间 + std:: deque tp,ts; + + }; + + /* + MIRCC 策略实现 + */ + class MIRCCStrategy : public Strategy, public ProcessNackTraits + { + public: + explicit MIRCCStrategy(Forwarder &forwarder, const Name &name = getStrategyName()); + + ~MIRCCStrategy() override = default; + + void + afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest,const shared_ptr &pitEntry) override; + + void + afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack,const shared_ptr &pitEntry) override; + + void + beforeSatisfyInterest(const shared_ptr &pitEntry,const FaceEndpoint &ingress, const Data &data) override; + + void + afterReceiveData(const shared_ptr &pitEntry,const FaceEndpoint &ingress, const Data &data) override; + + /** + * 获取最大的目标速率 + * @return + */ + static uint64_t + getMaxR() + { + return std::numeric_limits::max(); + } + + static const Name & + getStrategyName(); + + protected: + friend ProcessNackTraits; + + private: + unordered_map m_Rp_base;//Rp的base速率 + unordered_map m_Rp_ex;//Rp的ex速率 + unordered_map m_Rs_base;//Rs的base速率 + unordered_map m_Rs_ex;//Rs的ex速率 + unordered_map m_Yp;//本周期的Yp(t) + unordered_map m_Yp_old;//上一个周期的Yp(t) + unordered_map m_Ys;//本周起的Ys(t) + unordered_map m_Ys_old;//上一个周期的Ys(t) + unordered_map m_d;//维护每一个face的RTT + unorderer_map m_Queue;//用于出口排队转发兴趣包 + unordered_map m_ratio;//维护每一个face的Data/Interest + //这个大小之比要按照第一类,第二类进行分类么? + unordered_map m_linkC;//每一个链路的带宽,目前不知道怎么获取,都设置为10 * 1000 * 1000 * 8 + + //通过兴趣包标注的path来获得对应的转发端口 + //采用MIRCC论文中提出的第二个方案,由consumer随机生成hash值来引导转发,路由器无需在记录对应关系 + //unorderer_map pathid2face; + + //兴趣包大小常量 + const unint64_t INTEREST_SIZE = 300; + //队列大小常量 + const unint64_t QUEUE_MAX = 1000; + + const double m_B = 0.1,m_a = 0.5,m_miu = 0.95,m_ratio_a = 0.99 ,m_rtt_a = 0.99; + + const uint64_t = hashUnit[100],hashMo = 998244353,hashNum = 37,invMod = 0; + + + //发送对应端口1个正在排队的兴趣包,并设置下一次发送时间 + void + sendQueueInterest(FaceEndpoint &ingress); + + //清空对应端口队列中所有过期的兴趣包 + void + refreshQueueInterest(FaceEndpoint &ingress); + + //将兴趣包放入对应端口的队列 + void + pushQueueInterest(FaceEndpoint &ingress, const Interest &interest); + + //计算并返回q(t) + uint64_t + getQt(); + + //计算Rt + void + calcRt(FaceEndpoint &ingress); + + //更新Yt + void + updateYt(FaceEndpoint &ingress, const Interest &interest); + + void + setHashForData(const FaceEndpoint &ingress, const Data &data); + + uint64_t + fastpow(int64_t a,int64_t k,int64_t p); + }; + } +} + + + +