1
0
mirror of https://gitee.com/willfree/mlsr.git synced 2026-06-15 18:44:53 +08:00

add: SyncLogicHandler 以及 SyncProtocolAdapter 实现

This commit is contained in:
wzhq
2022-08-10 23:05:47 +08:00
parent b5e61066f4
commit eecb522c2e
2 changed files with 192 additions and 15 deletions
+153 -11
View File
@@ -10,11 +10,13 @@ package communication
import (
"errors"
common2 "minlib/common"
"minlib/component"
"minlib/logicface"
"minlib/utils"
mutil "minlib/utils"
"mlsr/common"
"mlsr/lsa"
"mlsr/utils"
"strconv"
)
@@ -30,7 +32,7 @@ import (
//
// incomingLogicFaceId LSA到来的LogicFaceId
//
type IsLsaNew = func(identifier *component.Identifier, lsaType *lsa.LsaType, seq uint64, incomingLogicFaceId uint64) bool
type IsLsaNew = func(identifier *component.Identifier, lsaType lsa.LsaType, seq uint64, incomingLogicFaceId uint64) bool
// SyncLogicHandler MLSR-to-MINSync 中间层
//
@@ -47,13 +49,13 @@ type SyncLogicHandler struct {
isLsaNew IsLsaNew
// confParam MLSR配置参数
confParam *common.MlsrConfigParameters
// nlsrComponent NLSR 字符串标识组件
nlsrComponent *component.IdentifierComponent
// lsaComponent LSA 字符串标识组件
lsaComponent *component.IdentifierComponent
// nlsrComponent NLSR字符串
nlsrComponent string
// lsaComponent LSA字符串
lsaComponent string
/////////////// 公有属性 /////////////
// OnNewLsa 当新LSA到来时的所触发的信号
OnNewLsa *utils.Signal
OnNewLsa *mutil.Signal
// NameLsaUserPrefix NameLsa前缀
NameLsaUserPrefix *component.Identifier
// AdjLsaUserPrefix AdjLsa前缀
@@ -64,6 +66,138 @@ type SyncLogicHandler struct {
SyncLogic *SyncProtocolAdapter
}
// ProcessUpdate 处理到达该路由节点的新同步数据
//
// 参数说明:
//
// updateName: 更新的数据前缀
//
// highSeq: 更新的数据版本
//
// incomingFaceId: 该同步数据到达的LogicFaceId
//
func (s *SyncLogicHandler) ProcessUpdate(updateName *component.Identifier, highSeq, incomingFaceId uint64) {
common2.LogInfo("Update Name: ", updateName.ToUri(), " Seq no: ", highSeq)
// nlsrPosition 从同步数据的标识中获取子字符串'nlsr'的位置
nlsrPosition := utils.GetNameComponentPosition(updateName, s.nlsrComponent)
// lsaPosition 从同步数据的标识中获取子字符串'lsa'的位置
lsaPosition := utils.GetNameComponentPosition(updateName, s.lsaComponent)
if nlsrPosition < 0 || lsaPosition < 0 {
common2.LogWarn("收到了不合法的同步更新数据")
return
}
// networkName 用于表示源MLSR节点网络标识
networkName, err := updateName.GetSubIdentifier(1, nlsrPosition-1)
if err != nil {
common2.LogError("解析源MLSR节点网络标识出错", err.Error())
return
}
networkNameBlock, err := networkName.SelfWireEncode(networkName)
// routerName 用于表示源MLSR节点路由器标识
// NLSR源代码:updateName.getSubName(lsaPosition + 1).getPrefix(-1);
// 意味着取updateName的第[lsaPosition+1, updateName.Size()-2]个组件组成标识
// 等价于从第lsaPosition+1开始取updateName.Size()-lsaPosition-2个
routerName, err := updateName.GetSubIdentifier(lsaPosition+1, updateName.Size()-lsaPosition-2)
if err != nil {
common2.LogError("解析源MLSR节点路由器标识出错", err.Error())
return
}
routerNameBlock, err := routerName.SelfWireEncode(routerName)
if err != nil {
common2.LogError(err.Error())
return
}
routerNameCopy, err := component.NewIdentifierByBlock(routerNameBlock)
if err != nil {
common2.LogError(err.Error())
return
}
// 将networkName和routerName组合成originRouterName
originRouterName, err := component.NewIdentifierByBlock(networkNameBlock)
if err != nil {
common2.LogError(err.Error())
return
}
for _, comp := range routerNameCopy.GetComponents() {
originRouterName.Append(comp)
}
}
// ProcessUpdateFromSync 处理更新的回调函数
//
// 主要用来确定更新的LSA类型并拉取它的载荷
//
// 首先检查收到的更新是否是我们自身发布的,这是可能发生的,
// 原因可以查看MINSync全同步逻辑中的TestMultipleNodesSimultaneousPublish测试用例
//
// 随后检查更新的类型,确定该更新信息用于更新哪类LSA
//
// 最后转发兴趣包拉取该LSA对应的载荷
//
func (s *SyncLogicHandler) ProcessUpdateFromSync(originRouter, updateName *component.Identifier, seqNo, incomingFaceId uint64) {
common2.LogDebug("Origin Router of update: ", originRouter.ToUri())
// 先校验该LSA更新是否由本地路由器发布,一个路由器不应该拉取自己发布的LSA
if originRouter.ToUri() != s.confParam.GetRouterPrefix().ToUri() {
// 校验LSA类型
var lsaType lsa.LsaType
lsaTypeComp, err := updateName.Get(-1)
if err != nil {
common2.LogError(err.Error())
return
}
// 将lsaTypeComp字符串转换为LSAType
// 先转为int,再转为LSAType
lsaTypeInt, err := strconv.Atoi(lsaTypeComp.ToUri())
if err != nil {
common2.LogError(err.Error())
return
}
lsaType = lsa.LsaType(lsaTypeInt)
// 判断是否Lsa更新
if s.isLsaNew(originRouter, lsaType, seqNo, incomingFaceId) {
// 双曲路由特性未实现,这里先不判断
//if lsaType == lsa.LsaADJACENCYType && seqNo != 0 {
// common2.LogError()
// return
//}
//if (lsaType == Lsa::Type::COORDINATE && seqNo != 0 &&
// m_confParam.getHyperbolicState() == HYPERBOLIC_STATE_OFF) {
// NLSR_LOG_ERROR("Got an update for coordinate LSA when link-state " <<
// "is enabled. Not going to fetch.");
// return;
//}
// 触发OnNewLsa信号,通知MLSR新的LSA到来
s.OnNewLsa.Emit(updateName, seqNo, originRouter, incomingFaceId)
}
}
}
// PublishRoutingUpdate 通知MINSync向其它节点发布更新
//
// 序列号seqNo取决于调用该方法时sequencing manager的状态
//
// 发布更新时会这样做可以满足其他路由器的Sync兴趣包,让它们知道有同步更新可用
//
func (s *SyncLogicHandler) PublishRoutingUpdate(t lsa.LsaType, seqNo uint64) {
switch t {
case lsa.LsaADJACENCYType:
s.SyncLogic.PublishUpdate(s.AdjLsaUserPrefix, seqNo)
case lsa.LsaCOORDINATEType:
s.SyncLogic.PublishUpdate(s.CoorLsaUserPrefix, seqNo)
case lsa.LsaNAMEType:
s.SyncLogic.PublishUpdate(s.NameLsaUserPrefix, seqNo)
}
}
// NewSyncLogicHandler SyncLogicHandler的工厂方法
//
func NewSyncLogicHandler(face *logicface.DummyClientLogicFace, isLsaNew IsLsaNew, conf *common.MlsrConfigParameters) (*SyncLogicHandler, error) {
@@ -73,14 +207,14 @@ func NewSyncLogicHandler(face *logicface.DummyClientLogicFace, isLsaNew IsLsaNew
// 基础参数
syncLogicHandler := new(SyncLogicHandler)
syncLogicHandler.OnNewLsa = utils.NewSignal()
syncLogicHandler.OnNewLsa = mutil.NewSignal()
syncLogicHandler.syncFace = face
syncLogicHandler.isLsaNew = isLsaNew
syncLogicHandler.confParam = conf
// 标识组件构造
syncLogicHandler.nlsrComponent = component.CreateIdentifierComponentByString("nlsr")
syncLogicHandler.lsaComponent = component.CreateIdentifierComponentByString("lsa")
syncLogicHandler.nlsrComponent = "nlsr"
syncLogicHandler.lsaComponent = "lsa"
// NLSR的源码中使用了字符串类型的标识组件, 而不是非负数类型
nameComponent := component.CreateIdentifierComponentByString(strconv.Itoa(int(lsa.LsaNAMEType)))
adjComponent := component.CreateIdentifierComponentByString(strconv.Itoa(int(lsa.LsaADJACENCYType)))
@@ -118,7 +252,15 @@ func NewSyncLogicHandler(face *logicface.DummyClientLogicFace, isLsaNew IsLsaNew
// if conf.GetHyperbolicState()
// 同步逻辑实例
syncLogicHandler.SyncLogic = NewSyncProtocolAdapter()
syncLogicHandler.SyncLogic, err = NewSyncProtocolAdapter(face,
// 公共的SyncPrefix
conf.GetSyncPrefix(),
// userPrefix
syncLogicHandler.NameLsaUserPrefix,
// syncInterestLifetime
conf.GetSyncInterestLifetime(),
// 处理更新同步数据的回调函数
syncLogicHandler.ProcessUpdate)
return syncLogicHandler, nil
}
+39 -4
View File
@@ -10,7 +10,11 @@ package communication
import (
"MINSync"
"errors"
"minlib/common"
"minlib/component"
"minlib/logicface"
"time"
)
// SyncUpdateCallback 当同步协议获取到其它节点的更新数据时
@@ -45,7 +49,7 @@ func (s *SyncProtocolAdapter) AddUserNode(userPrefix *component.Identifier) {
// MLSR 有着自己一套的序列号管理规则,因此每次发布更新都将强制更新生产者的序列号
//
func (s *SyncProtocolAdapter) PublishUpdate(userPrefix *component.Identifier, seq uint64) {
s.minSyncLogic.PublishName(userPrefix, int64(seq))
}
// OnMINSyncUpdate 每当MINSync获取到新同步数据时便会调用的回调函数
@@ -56,10 +60,41 @@ func (s *SyncProtocolAdapter) PublishUpdate(userPrefix *component.Identifier, se
//
// updates: MissingDataInfo的列表
//
func (s *SyncProtocolAdapter) OnMINSyncUpdate(updates MINSync.MissingDataInfo) {
func (s *SyncProtocolAdapter) OnMINSyncUpdate(updates []*MINSync.MissingDataInfo) {
common.LogInfo("Received PSync update event")
for _, update := range updates {
s.syncUpdateCallback(update.Prefix, update.HighSeq, update.IncomingFace)
}
}
func NewSyncProtocolAdapter() *SyncProtocolAdapter {
panic("Implement me")
// NewSyncProtocolAdapter SyncProtocolAdapter的工厂方法
//
func NewSyncProtocolAdapter(face *logicface.DummyClientLogicFace,
syncPrefix, userPrefix *component.Identifier,
syncInterestLifetime time.Duration,
syncUpdateCallback SyncUpdateCallback) (*SyncProtocolAdapter, error) {
if face == nil || syncPrefix == nil || userPrefix == nil || syncUpdateCallback == nil {
return nil, errors.New("无效的输入参数")
}
common.LogInfo("Using PSync")
syncProtocolAdapter := new(SyncProtocolAdapter)
syncProtocolAdapter.syncUpdateCallback = syncUpdateCallback
// 根据PSync源码来说,默认将Sync Data的生命周期设为SYNC_REPLY_FRESHNESS
// 魔数 80 意味着两个路由节点的数据前缀差集最大在80条左右
fullProducer, err := MINSync.NewFullProducer(
80,
face,
syncPrefix, userPrefix,
syncProtocolAdapter.OnMINSyncUpdate,
syncInterestLifetime, MINSync.SYNC_REPLY_FRESHNESS)
if err != nil {
return nil, err
}
syncProtocolAdapter.minSyncLogic = fullProducer
return syncProtocolAdapter, nil
}