增加sendQuickCPacketV3(),减少发包阻塞

This commit is contained in:
free will
2021-10-26 20:52:52 +08:00
parent e231aa8661
commit 2a30a11498
5 changed files with 99 additions and 18 deletions
+46 -9
View File
@@ -256,26 +256,63 @@ public class LinkService {
return false;
}
} catch (LogicFaceException e) {
throw new LogicFaceException("LinkService.sendCPacket: "+e.getMessage());
throw new LogicFaceException("LinkService.sendQuickCPacket: "+e.getMessage());
}
}
/**
* 将已经编码好的CPacket变成LpPacket(不分包),然后快速发出去
* @param encodedBytes
* @return
* @throws LogicFaceException
*/
public boolean sendQuickCPacketV3(byte[] encodedBytes) throws LogicFaceException {
try {
LpPacket lpPacket=getLpPacketFromQuickCPacket(encodedBytes);
if(lpPacket!=null) {
return this.transport.send(lpPacket);
}else {
return false;
}
} catch (LogicFaceException e) {
throw new LogicFaceException("LinkService.sendQuickCPacketV3: "+e.getMessage());
}
}
private LpPacket getLpPacketFromQuickCPacket(byte[] encodedBytes) throws LogicFaceException {
try {
// 1.byte[](CPacket编码而成) => LpPacket
LpPacket lpPacket=new LpPacket();
lpPacket.setId(this.lpPacketId);
lpPacket.setFragmentNum(1); //分片数
lpPacket.setFragmentSeq(0); //片序号
lpPacket.setValue(encodedBytes);
this.lpPacketId++;
return lpPacket;
// } catch (PacketException | EncoderException e) {
} catch (Exception e) {
throw new LogicFaceException("LinkService.getMINPacketFromLpPacket: "+e.getMessage());
}
}
private LpPacket getLpPacketFromCPacket(CPacket cPacket) throws LogicFaceException {
try {
// // 1.CPacket => byte[]
// Encoder encoder=new Encoder();
// if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){
// return null;
// }
// int bufLen=cPacket.wireEncode(encoder);
// byte[] buf=encoder.getBuffer();
// 1.CPacket => byte[]
Encoder encoder=new Encoder();
if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){
return null;
}
int bufLen=cPacket.wireEncode(encoder);
byte[] buf=encoder.getBuffer();
// 2.byte[] => LpPacket
LpPacket lpPacket=new LpPacket();
lpPacket.setId(this.lpPacketId);
lpPacket.setFragmentNum(1); //分片数
lpPacket.setFragmentSeq(0); //片序号
lpPacket.setValue(cpacketBytes);
// lpPacket.setValue(cpacketBytes);
lpPacket.setValue(buf);
this.lpPacketId++;
return lpPacket;
+12 -2
View File
@@ -303,15 +303,25 @@ public class LogicFace {
}
/**
* 快发包(暂时仅支持TCP:减少包过程,迅速将包发送出去
* 快发包:减少包过程,迅速将包发送出去
* @param cPacket
* @return
* @throws LogicFaceException
*/
public boolean sendQuickCPacketByTCP(CPacket cPacket) throws LogicFaceException {
public boolean sendQuickCPacket(CPacket cPacket) throws LogicFaceException {
return this.linkService.sendQuickCPacket(cPacket);
}
/**
* 快发包V3:减少TLV编码及分包过程
* @param encodedBytes
* @return
* @throws LogicFaceException
*/
public boolean sendQuickCPacketV3(byte[] encodedBytes) throws LogicFaceException {
return this.linkService.sendQuickCPacketV3(encodedBytes);
}
/**
* 发送一个兴趣包
* @param interest
+28
View File
@@ -40,6 +40,7 @@ public class CPacket implements InteractWithField, IEncodingAble {
public Payload payload=new Payload();
public Identifier srcIdentifier=new Identifier();
public Identifier dstIdentifier=new Identifier();
public byte[] rawData=null; // 放置cpacket已经编码后的数据
public CPacket(){
}
@@ -52,6 +53,33 @@ public class CPacket implements InteractWithField, IEncodingAble {
this.ttl=ttl;
}
public byte[] getRawData(){
return rawData;
}
/**
* 编码CPacket
* @return
*/
public boolean encodeSelf(){
try {
Encoder encoder=new Encoder();
if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){
return false;
}
int bufLen=this.wireEncode(encoder);
if(bufLen<0){
return false;
}
this.rawData=encoder.getBuffer();
return true;
} catch (EncoderException | PacketException e) {
e.printStackTrace();
return false;
}
}
public CPacket createCPacketByMINPacket(MINPacket minPacket) throws PacketException {
try {
CPacket cPacket=new CPacket();
@@ -51,7 +51,7 @@ public class RegisterPrefixHelperTest {
byte[] pay=new byte[1000];
cPacket.payload.setValue(pay);
ConcurrentLinkedQueue<CPacket> readTunCPackets = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 100000; i++) {
for (int i = 0; i < 10000*1000; i++) {
readTunCPackets.offer(cPacket);
}
@@ -63,8 +63,8 @@ public class RegisterPrefixHelperTest {
// 出队
CPacket newPacket = readTunCPackets.poll();
if(newPacket!=null) {
face.sendQuickCPacketByTCP(newPacket);
// face.sendCPacket(newPacket);
// face.sendQuickCPacketByTCP(newPacket);
face.sendCPacket(newPacket);
}
}
System.out.println("nums: "+nums);
@@ -76,16 +76,18 @@ public class RegisterPrefixHelperTest {
@Test
public void registerPrefixByTest() throws Exception {
// 注册前缀Identifier
Identifier identifier=new Identifier("/min/gdcni9/wefree/"
+System.currentTimeMillis());
Identifier identifier=new Identifier("/min/gdcni9/wefree2");
// +System.currentTimeMillis());
// 初始化face
LogicFace face=new LogicFace();
// face.initWithTcp("14.215.134.202",14922);
face.initWithUdp("14.215.134.202",14922);
face.initWithTcp("14.215.134.202",14922);
// face.initWithUdp("14.215.134.202",14922);
RegisterPrefixHelper helper=new RegisterPrefixHelper();
KeyManagerExample.INSTANCE.initKeyChain("/wefree/test","D://");
face.setKeyChain(KeyManagerExample.INSTANCE.getKeyChain());
face.registerIdentifier(identifier,5000,helper);
// Thread.sleep(1000*60);
}
}
+4
View File
@@ -79,6 +79,10 @@ public class CPacketTest_WF {
Block block=new Block(buf,false);
System.out.println("block len: "+block.getLength());
// 测试encodeSelf()
cPacket.encodeSelf();
System.out.println("encodeSelf res: "+ Arrays.toString(cPacket.getRawData()));
CPacket newPacket=new CPacket();
System.out.println("decode res: "+newPacket.wireDecode(block));
System.out.println("src: "+newPacket.getSrcIdentifier().toUri());