diff --git a/src/main/java/logicface/LinkService.java b/src/main/java/logicface/LinkService.java index 3fea8cf..8940de6 100644 --- a/src/main/java/logicface/LinkService.java +++ b/src/main/java/logicface/LinkService.java @@ -256,26 +256,63 @@ public class LinkService { return false; } } catch (LogicFaceException e) { - throw new LogicFaceException("LinkService.sendCPacket: "+e.getMessage()); + throw new LogicFaceException("LinkService.sendQuickCPacket: "+e.getMessage()); + } + } + + /** + * 将已经编码好的CPacket变成LpPacket(不分包),然后快速发出去 + * @param encodedBytes + * @return + * @throws LogicFaceException + */ + public boolean sendQuickCPacketV3(byte[] encodedBytes) throws LogicFaceException { + try { + LpPacket lpPacket=getLpPacketFromQuickCPacket(encodedBytes); + if(lpPacket!=null) { + return this.transport.send(lpPacket); + }else { + return false; + } + } catch (LogicFaceException e) { + throw new LogicFaceException("LinkService.sendQuickCPacketV3: "+e.getMessage()); + } + } + + private LpPacket getLpPacketFromQuickCPacket(byte[] encodedBytes) throws LogicFaceException { + try { + // 1.byte[](CPacket编码而成) => LpPacket + LpPacket lpPacket=new LpPacket(); + lpPacket.setId(this.lpPacketId); + lpPacket.setFragmentNum(1); //分片数 + lpPacket.setFragmentSeq(0); //片序号 + lpPacket.setValue(encodedBytes); + + this.lpPacketId++; + return lpPacket; +// } catch (PacketException | EncoderException e) { + } catch (Exception e) { + throw new LogicFaceException("LinkService.getMINPacketFromLpPacket: "+e.getMessage()); } } private LpPacket getLpPacketFromCPacket(CPacket cPacket) throws LogicFaceException { try { -// // 1.CPacket => byte[] -// Encoder encoder=new Encoder(); -// if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){ -// return null; -// } -// int bufLen=cPacket.wireEncode(encoder); -// byte[] buf=encoder.getBuffer(); + // 1.CPacket => byte[] + Encoder encoder=new Encoder(); + if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){ + return null; + } + int bufLen=cPacket.wireEncode(encoder); + byte[] buf=encoder.getBuffer(); // 2.byte[] => LpPacket LpPacket lpPacket=new LpPacket(); lpPacket.setId(this.lpPacketId); lpPacket.setFragmentNum(1); //分片数 lpPacket.setFragmentSeq(0); //片序号 - lpPacket.setValue(cpacketBytes); +// lpPacket.setValue(cpacketBytes); + lpPacket.setValue(buf); this.lpPacketId++; return lpPacket; diff --git a/src/main/java/logicface/LogicFace.java b/src/main/java/logicface/LogicFace.java index 6a2e812..9590dab 100644 --- a/src/main/java/logicface/LogicFace.java +++ b/src/main/java/logicface/LogicFace.java @@ -303,15 +303,25 @@ public class LogicFace { } /** - * 快发包(暂时仅支持TCP):减少打包过程,迅速将包发送出去 + * 快发包:减少分包过程,迅速将包发送出去 * @param cPacket * @return * @throws LogicFaceException */ - public boolean sendQuickCPacketByTCP(CPacket cPacket) throws LogicFaceException { + public boolean sendQuickCPacket(CPacket cPacket) throws LogicFaceException { return this.linkService.sendQuickCPacket(cPacket); } + /** + * 快发包V3:减少TLV编码及分包过程 + * @param encodedBytes + * @return + * @throws LogicFaceException + */ + public boolean sendQuickCPacketV3(byte[] encodedBytes) throws LogicFaceException { + return this.linkService.sendQuickCPacketV3(encodedBytes); + } + /** * 发送一个兴趣包 * @param interest diff --git a/src/main/java/packet/CPacket.java b/src/main/java/packet/CPacket.java index 25e33b7..45ec820 100644 --- a/src/main/java/packet/CPacket.java +++ b/src/main/java/packet/CPacket.java @@ -40,6 +40,7 @@ public class CPacket implements InteractWithField, IEncodingAble { public Payload payload=new Payload(); public Identifier srcIdentifier=new Identifier(); public Identifier dstIdentifier=new Identifier(); + public byte[] rawData=null; // 放置cpacket已经编码后的数据 public CPacket(){ } @@ -52,6 +53,33 @@ public class CPacket implements InteractWithField, IEncodingAble { this.ttl=ttl; } + public byte[] getRawData(){ + return rawData; + } + + /** + * 编码CPacket + * @return + */ + public boolean encodeSelf(){ + try { + Encoder encoder=new Encoder(); + if(!encoder.encoderReset(new SizeT(Encoder.MaxPacketSize),new SizeT(0))){ + return false; + } + + int bufLen=this.wireEncode(encoder); + if(bufLen<0){ + return false; + } + this.rawData=encoder.getBuffer(); + return true; + } catch (EncoderException | PacketException e) { + e.printStackTrace(); + return false; + } + } + public CPacket createCPacketByMINPacket(MINPacket minPacket) throws PacketException { try { CPacket cPacket=new CPacket(); diff --git a/src/test/java/mgmt/RegisterPrefixHelperTest.java b/src/test/java/mgmt/RegisterPrefixHelperTest.java index 60bea73..a129af0 100644 --- a/src/test/java/mgmt/RegisterPrefixHelperTest.java +++ b/src/test/java/mgmt/RegisterPrefixHelperTest.java @@ -51,7 +51,7 @@ public class RegisterPrefixHelperTest { byte[] pay=new byte[1000]; cPacket.payload.setValue(pay); ConcurrentLinkedQueue readTunCPackets = new ConcurrentLinkedQueue<>(); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000*1000; i++) { readTunCPackets.offer(cPacket); } @@ -63,8 +63,8 @@ public class RegisterPrefixHelperTest { // 出队 CPacket newPacket = readTunCPackets.poll(); if(newPacket!=null) { - face.sendQuickCPacketByTCP(newPacket); -// face.sendCPacket(newPacket); +// face.sendQuickCPacketByTCP(newPacket); + face.sendCPacket(newPacket); } } System.out.println("nums: "+nums); @@ -76,16 +76,18 @@ public class RegisterPrefixHelperTest { @Test public void registerPrefixByTest() throws Exception { // 注册前缀Identifier - Identifier identifier=new Identifier("/min/gdcni9/wefree/" - +System.currentTimeMillis()); + Identifier identifier=new Identifier("/min/gdcni9/wefree2"); +// +System.currentTimeMillis()); // 初始化face LogicFace face=new LogicFace(); -// face.initWithTcp("14.215.134.202",14922); - face.initWithUdp("14.215.134.202",14922); + face.initWithTcp("14.215.134.202",14922); +// face.initWithUdp("14.215.134.202",14922); RegisterPrefixHelper helper=new RegisterPrefixHelper(); KeyManagerExample.INSTANCE.initKeyChain("/wefree/test","D://"); face.setKeyChain(KeyManagerExample.INSTANCE.getKeyChain()); face.registerIdentifier(identifier,5000,helper); + +// Thread.sleep(1000*60); } } diff --git a/src/test/java/packet/CPacketTest_WF.java b/src/test/java/packet/CPacketTest_WF.java index c0934fc..de77c2c 100644 --- a/src/test/java/packet/CPacketTest_WF.java +++ b/src/test/java/packet/CPacketTest_WF.java @@ -79,6 +79,10 @@ public class CPacketTest_WF { Block block=new Block(buf,false); System.out.println("block len: "+block.getLength()); + // 测试encodeSelf() + cPacket.encodeSelf(); + System.out.println("encodeSelf res: "+ Arrays.toString(cPacket.getRawData())); + CPacket newPacket=new CPacket(); System.out.println("decode res: "+newPacket.wireDecode(block)); System.out.println("src: "+newPacket.getSrcIdentifier().toUri());