diff --git a/src/main/java/logicface/LogicFaceICN.java b/src/main/java/logicface/LogicFaceICN.java index ccdd3e2..dd0b0fb 100644 --- a/src/main/java/logicface/LogicFaceICN.java +++ b/src/main/java/logicface/LogicFaceICN.java @@ -386,7 +386,7 @@ public class LogicFaceICN extends LogicFace { * 设置一个标识监听,并绑定监听的函数,通过fib表来记录什么前缀对应什么兴趣包到达处理函数 * @param identifier 监听的标识 * @param timeout 注册前缀超时时间 - * @param helper + * @param helper * @param onInterestInterface 收到兴趣包的处理函数 * @param onRegisterFailInterface 注册标识失败的回调函数 * @param onRegisterSuccessInterface 注册标识成功的回调函数 diff --git a/src/main/java/logicface/StreamTransport.java b/src/main/java/logicface/StreamTransport.java index 018a423..45b2484 100644 --- a/src/main/java/logicface/StreamTransport.java +++ b/src/main/java/logicface/StreamTransport.java @@ -35,7 +35,7 @@ public class StreamTransport extends Transport implements ITransport{ try { this.channel.close(); } catch (IOException e) { - LoggerHelper.logger.debug(e.getMessage()); + LoggerHelper.logger.debug("StreamTransport.close: "+e.getMessage()); } } @@ -55,6 +55,7 @@ public class StreamTransport extends Transport implements ITransport{ while (writeLen< encodeBuf.length){ int writeRet=this.channel.write(ByteBuffer.wrap(encodeBuf)); if(writeRet<0){ + LoggerHelper.logger.error("send to tcp transport error"); this.linkService.logicFace.shutDown(); return false; } @@ -83,7 +84,7 @@ public class StreamTransport extends Transport implements ITransport{ private Pair readPktAndDeal(byte[] buf, long bufLen) throws LogicFaceException { try { // 如果接收到的数据长度小于 LpPacket type 字段的长度 3字节 则要等待 - if(bufLen< (new VlInt(TLV.TlvLpPacket).size())){ + if(bufLen < (new VlInt(TLV.TlvLpPacket).size())){ return new Pair(0L,null); } VlInt pktType=TLV.readType(buf,new VlInt(0)); @@ -175,7 +176,7 @@ public class StreamTransport extends Transport implements ITransport{ return true; } this.channel.socket().setSoTimeout((int)(duration)); - return false; + return true; } catch (SocketException e) { throw new LogicFaceException("StreamTransport.setReadTimeout"+e.getMessage()); } diff --git a/src/main/java/logicface/Transport.java b/src/main/java/logicface/Transport.java index 84283ab..7759419 100644 --- a/src/main/java/logicface/Transport.java +++ b/src/main/java/logicface/Transport.java @@ -22,9 +22,9 @@ public class Transport { // TODO 虽然第一个版本可能用不到linkService分包与合包功能, // 但以后可能需要客户端也能直接发出用以太网封装的MIN网络包,所以还是留着linkService - public void close(){ - throw new Error("implement me"); - } +// public void close(){ +// throw new Error("implement me"); +// } /** * 从[]byte中解析出LpPacket diff --git a/src/main/java/logicface/UdpTransport.java b/src/main/java/logicface/UdpTransport.java index 716784b..15659d9 100644 --- a/src/main/java/logicface/UdpTransport.java +++ b/src/main/java/logicface/UdpTransport.java @@ -1,5 +1,6 @@ package logicface; +import common.LoggerHelper; import packet.LpPacket; import java.net.*; @@ -30,19 +31,33 @@ public class UdpTransport extends Transport implements ITransport{ * 用于接收数据的transport的初始化函数 * @param channel */ - public void init(DatagramChannel channel) throws LogicFaceException { + public void init(DatagramChannel channel,SocketAddress remoteUdpAddr) throws LogicFaceException { try { this.channel=channel; this.localAddr=channel.getLocalAddress().toString(); this.localUri="udp://"+this.localAddr; - this.remoteAddr=channel.getRemoteAddress().toString(); + + this.remoteAddr=remoteUdpAddr.toString(); this.remoteUri="udp://"+this.remoteAddr; this.recvBuf=new byte[9000]; + this.remoteUdpAddr=remoteUdpAddr; } catch (IOException e) { throw new LogicFaceException("UdpTransport.init: "+e.getMessage()); } } + /** + * 关闭 + */ + @Override + public void close() { + try { + this.channel.close(); + } catch (IOException e) { + LoggerHelper.logger.error(e.getMessage()); + } + } + /** * 发送一个lpPacket * @param lpPacket @@ -55,6 +70,7 @@ public class UdpTransport extends Transport implements ITransport{ if(encodeBuf.length<=0){ return false; } + LoggerHelper.logger.debug("start write to udp : "+this.remoteUdpAddr.toString()); this.channel.socket().connect(remoteUdpAddr); int res=this.channel.write(ByteBuffer.wrap(encodeBuf)); if(res<0){ @@ -76,7 +92,7 @@ public class UdpTransport extends Transport implements ITransport{ if(remoteUdpAddr==null){ return null; } - //todo: 日志记录addr + LoggerHelper.logger.debug("recv from udp: "+this.remoteAddr); LpPacket lpPacket=this.parseByteArray2LpPacket(this.recvBuf); if(lpPacket==null){ return null; @@ -100,7 +116,7 @@ public class UdpTransport extends Transport implements ITransport{ return true; } this.channel.socket().setSoTimeout((int)(duration)); - return false; + return true; } catch (SocketException e) { throw new LogicFaceException("UdpTransport.setReadTimeout"+e.getMessage()); } diff --git a/src/main/java/logicface/UnixStreamTransport.java b/src/main/java/logicface/UnixStreamTransport.java index c1669e6..aa089bf 100644 --- a/src/main/java/logicface/UnixStreamTransport.java +++ b/src/main/java/logicface/UnixStreamTransport.java @@ -11,13 +11,13 @@ import packet.LpPacket; import util.ByteHelper; import java.io.IOException; -import java.net.SocketException; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; /* * @Author: Wang Feng - * @Description: + * @Description: 这里与go版本的minlib的实现不同,这是因为java基础库中似乎没有实现unixsocket。 + * 这里调用了maven的一个jnr-unixsocket库进行unix-transport的实现。 + * 目前,不需要对该类进行测试,因为Java版本主要用于客户端,而客户端暂时没有unixsocket传输需求。 * @Version: 1.0.0 * @Date: 15:37 2021/4/16 * @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室