rewrite UnixStreamTransport & supplement LogicFace

This commit is contained in:
free will
2021-04-21 17:48:10 +08:00
parent e5ddbbac4e
commit 8e8767393c
4 changed files with 298 additions and 23 deletions
+16
View File
@@ -127,6 +127,22 @@
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<!-- Unix Socket -->
<!-- https://mvnrepository.com/artifact/com.github.jnr/jnr-unixsocket -->
<!-- <dependency>-->
<!-- <groupId>com.github.jnr</groupId>-->
<!-- <artifactId>jnr-unixsocket</artifactId>-->
<!-- <version>0.18</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/com.cloudbees.util/jnr-unixsocket-nodep -->
<dependency>
<groupId>com.cloudbees.util</groupId>
<artifactId>jnr-unixsocket-nodep</artifactId>
<version>0.3.1</version>
</dependency>
</dependencies>
</project>
+105 -9
View File
@@ -1,10 +1,25 @@
package logicface;
import com.sun.deploy.net.socket.UnixDomainSocket;
import common.LoggerHelper;
import component.ComponentException;
import component.Identifier;
import component.IdentifierWrapper;
import encoding.TLV;
import encoding.VlIntException;
import jdk.nashorn.internal.lookup.MethodHandleFactory;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
import packet.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
/*
* @Author: Wang Feng
* @Description: 逻辑接口类
@@ -23,7 +38,7 @@ public class LogicFace {
// 成员变量区
public int logicFaceType;
public ITransport transport; // 与logicFace绑定的transport
public LinkService linkService=new LinkService(); // 与logicFace绑定的linkService
public LinkService linkService; // 与logicFace绑定的linkService
public LogicFaceCounters logicFaceCounters=new LogicFaceCounters(); // logicFace 流量统计对象
public long expireTime; // 超时时间 ms
public boolean state; // true 为 up , false 为down
@@ -34,8 +49,24 @@ public class LogicFace {
* @param port MIR的TCP端口号,如 13899
* @return
*/
public boolean initWithTcp(String ip,String port){
return true;
public boolean initWithTcp(String ip,String port) throws LogicFaceException {
try {
SocketChannel channel=SocketChannel.open();
channel.connect(new InetSocketAddress(ip, Integer.parseInt(port)));
this.linkService=new LinkService();
TcpTransport tcpTransport=new TcpTransport();
tcpTransport.init(channel);
this.linkService.init(9000); // 设置MTU为 9000 字节
this.linkService.logicFace=this;
this.linkService.transport=tcpTransport;
tcpTransport.linkService=this.linkService;
this.transport=tcpTransport;
this.logicFaceType=LogicFaceTypeTCP;
this.state=true;
return true;
} catch (IOException e) {
throw new LogicFaceException("LogicFace.initWithTcp: "+e.getMessage());
}
}
/**
@@ -44,8 +75,24 @@ public class LogicFace {
* @param port MIR的TCP端口号,如 13899
* @return
*/
public boolean initWithUdp(String ip,String port){
return true;
public boolean initWithUdp(String ip,String port) throws LogicFaceException {
try {
DatagramChannel channel=DatagramChannel.open();
channel.connect(new InetSocketAddress(ip, Integer.parseInt(port)));
this.linkService=new LinkService();
UdpTransport udpTransport=new UdpTransport();
udpTransport.init(channel);
this.linkService.init(9000); // 设置MTU为 9000 字节
this.linkService.logicFace=this;
this.linkService.transport=udpTransport;
udpTransport.linkService=this.linkService;
this.transport=udpTransport;
this.logicFaceType=LogicFaceTypeUDP;
this.state=true;
return true;
} catch (IOException e) {
throw new LogicFaceException("LogicFace.initWithUdp: "+e.getMessage());
}
}
/**
@@ -53,8 +100,26 @@ public class LogicFace {
* @param path MIR的unix socket地址,默认是 "/tmp/mir-sock"
* @return
*/
public boolean initWithUnixSocket(String path){
return true;
public boolean initWithUnixSocket(String path) throws LogicFaceException {
try {
// 建立 Unix Socket 连接
File unixFile = new File(path);
UnixSocketChannel channel=UnixSocketChannel.open();
channel.connect(new UnixSocketAddress(unixFile));
this.linkService=new LinkService();
UnixStreamTransport unixStreamTransport=new UnixStreamTransport();
unixStreamTransport.init(channel);
this.linkService.init(9000); // 设置MTU为 9000 字节
this.linkService.logicFace=this;
this.linkService.transport=unixStreamTransport;
unixStreamTransport.linkService=this.linkService;
this.transport=unixStreamTransport;
this.logicFaceType=LogicFaceTypeUnix;
this.state=true;
return true;
} catch (IOException e) {
throw new LogicFaceException("LogicFace.initWithUnix: "+e.getMessage());
}
}
/**
@@ -66,8 +131,39 @@ public class LogicFace {
* @param timeout 超时时间,以 毫秒 为单位
* @return
*/
public boolean registerIdentifier(Identifier identifier,long timeout){
return true;
public boolean registerIdentifier(Identifier identifier,long timeout) throws LogicFaceException {
try {
Interest interest=MgmtCommand.createRegisterIdentifierInterest(identifier);
if(interest==null){
return false;
}
if(!this.sendInterest(interest)){
return false;
}
if(!this.transport.setReadTimeout(timeout)){
return false;
}
MINPacket minPacket=this.receivePacket();
if(minPacket==null){
if(!this.transport.setReadTimeout(-1)){
return false;
}
}
IdentifierWrapper identifierWrapper=minPacket.identifierField.getIdentifier(0);
if(identifierWrapper==null){
return false;
}
if(identifierWrapper.getTlvType().getVlIntValue2Int()!=TLV.TlvIdentifierContentData){
LoggerHelper.logger.error("receive packet is not data");
return false;
}
// TODO 解析数据包
return true;
} catch (LogicFaceException | VlIntException | ComponentException e) {
throw new LogicFaceException("LogicFace.registerIdentifier: "+e.getMessage());
}
}
/**
+2 -1
View File
@@ -1,5 +1,6 @@
package logicface;
import common.LoggerHelper;
import encoding.TLV;
import encoding.TLVException;
import encoding.VlInt;
@@ -34,7 +35,7 @@ public class StreamTransport extends Transport implements ITransport{
try {
this.channel.close();
} catch (IOException e) {
// todo: 日志记录 e.printStackTrace();
LoggerHelper.logger.debug(e.getMessage());
}
}
+175 -13
View File
@@ -1,6 +1,18 @@
package logicface;
import common.LoggerHelper;
import encoding.TLV;
import encoding.TLVException;
import encoding.VlInt;
import encoding.VlIntException;
import javafx.util.Pair;
import jnr.unixsocket.UnixSocketChannel;
import packet.LpPacket;
import util.ByteHelper;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/*
@@ -11,22 +23,172 @@ import java.nio.channels.SocketChannel;
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
public class UnixStreamTransport extends StreamTransport{
public UnixSocketChannel channel;
// 数据接收缓冲区
public byte[] recvBuf;
// 当前数据接收缓冲区中的有效数据的长度
public long recvLen;
/**
* 初始化UnixStreamTransport
* @param channel
*/
public void init(SocketChannel channel) throws LogicFaceException {
try {
this.channel=channel;
this.localAddr=channel.getLocalAddress().toString();
this.localUri="unix://"+this.localAddr;
this.remoteAddr=channel.getRemoteAddress().toString();
this.remoteUri="unix://"+this.remoteAddr;
this.recvBuf=new byte[1024*1028*4];
this.recvLen=0;
} catch (IOException e) {
throw new LogicFaceException("TcpTransport.init: "+e.getMessage());
}
public void init(UnixSocketChannel channel) throws LogicFaceException {
this.channel=channel;
this.localAddr=channel.getLocalSocketAddress().toString();
this.localUri="unix://"+this.localAddr;
this.remoteAddr=channel.getRemoteSocketAddress().toString();
this.remoteUri="unix://"+this.remoteAddr;
this.recvBuf=new byte[1024*1028*4];
this.recvLen=0;
}
@Override
public void close(){
try {
this.channel.close();
} catch (IOException e) {
LoggerHelper.logger.debug(e.getMessage());
}
}
/**
* 将lpPacket对象编码成字节数组后,通过流式通道发送出去
* @param lpPacket
* @return
*/
@Override
public boolean send(LpPacket lpPacket) throws LogicFaceException {
try {
byte[] encodeBuf=this.encodeLpPacket2ByteArray(lpPacket);
if(encodeBuf==null){
return false;
}
int writeLen=0;
while (writeLen< encodeBuf.length){
int writeRet=this.channel.write(ByteBuffer.wrap(encodeBuf));
if(writeRet<0){
this.linkService.logicFace.shutDown();
return false;
}
writeLen+=writeRet;
}
return true;
} catch (LogicFaceException | IOException e) {
throw new LogicFaceException("StreamTransport.send: "+e.getMessage());
}
}
/**
* 从接收缓冲区中读取包并调用linkService的ReceivePacket函数处理包
* // 每次收到数据时会调用这个函数从数据接收缓冲区中尝试读出一个LpPacket包。
* // 工作流程:
* // (1) 首先,如果数据缓冲区中收到的字节数不足以构成了一个TLV的Type字段,则返回 nil,0 表示还需要等待数据接收
* // (2 如果解析出来的Type值不等于TlvLpPacket, 表示接收的数据出错了,需要提示调用者关闭Face
* // (3) 如果接收到的数据还小于一个LpPacket的最小长度(不含负载的只含头部的长度), 表示还需要等待数据接收
* // 4 如果长度足够,TLV的Length部分,并计算一个LpPacket的总长度 totalPktLen
* // (5) 如果接收到的数据长度小于 totalPktLen , 则还需要等待后面数据接收
* // 6 从[]byte中解析出LpPacket,并调用linkService.ReceivePacket(lpPacket) 处理一个完整的LpPacket包
* @param buf
* @param bufLen
* @return
*/
private Pair<Long,LpPacket> readPktAndDeal(byte[] buf, long bufLen) throws LogicFaceException {
try {
// 如果接收到的数据长度小于 LpPacket type 字段的长度 3字节 则要等待
if(bufLen< (new VlInt(TLV.TlvLpPacket).size())){
return new Pair<Long, LpPacket>(0L,null);
}
VlInt pktType=TLV.readType(buf,new VlInt(0));
// 如果数据类型的 TLV 和 type值不等于 encoding.TlvLpPacket 则接收出错,应该关闭当前logicFace
if(pktType.getVlIntValue2Int()!=TLV.TlvLpPacket){
return new Pair<Long, LpPacket>(0L,null);
}
// 如果接收到的数据长度小于 LpPacket 的小于长度 则要等待
if(bufLen<this.linkService.lpPacketHeadSize){
return new Pair<Long, LpPacket>(0L,null);
}
int pktTypeLen=pktType.size();
int pktLen=TLV.readVarNumber(buf,new VlInt(pktTypeLen)).getVlIntValue2Int();
int totalPktLen=pktTypeLen+new VlInt(pktLen).getVlIntValue2Int()+pktLen;
if(bufLen>=totalPktLen){
LpPacket lpPacket=this.parseByteArray2LpPacket(
ByteHelper.getLenBytes(buf,0,totalPktLen));
return new Pair<Long, LpPacket>((long) totalPktLen,lpPacket);
}
return new Pair<Long, LpPacket>(0L,null);
} catch (TLVException | VlIntException e) {
throw new LogicFaceException("StreamTransport.readPktAndDeal: "+e.getMessage());
}
}
/**
* 接收到数据后,处理包
* // 1 调用 readPktAndDeal ,传入当前接收的到数据的[]byte,以及接收到的数据长度
* // 2 如果readPktAndDeal 返回错误,则将错误抛给上层调用者
* // 3 如果readPktAndDeal没返回错误,且返回的已经被处理的LpPacket长度pktLen大于0, 则循环做以下操作
* // a 统计已经处理的数据长度dealLen(等于每次处理包长度的总和),如果已经处理的长度小接收数据长度t.recvLen,
* // 而且 readPktAndDeal返回的错误为nil,且返回的pktLen > 0 再次调用readPktAndDeal去处理数据
* // b 如果循环中readPktAndDeal返回的错误不为nil,则终止循环,并将错误报给调用者
* // (4) 如果统计到的总处理长度 dealLen 大小0, 则将已经处理的数据从数据接收缓冲区中删除。删除的方法是将recvBuf[dealLen:t.recvLen]
* // 移到 t.recvBuf[:] , 即将未处理的数据移到接收缓冲区开关,并将接收数据长度t.recvLen 送去 已经处理的长度 dealLen
* @return
*/
private LpPacket doReceive() throws LogicFaceException {
Pair<Long,LpPacket> packetPair=this.readPktAndDeal(
ByteHelper.getLenBytes(this.recvBuf,0, (int) this.recvLen),this.recvLen);
long pktLen=packetPair.getKey();
LpPacket lpPacket=packetPair.getValue();
// 循环多次尝试从接收缓冲区中读出包并处理
if(pktLen>0){
//todo: 啥玩意儿???
}
return lpPacket;
}
/**
* 用协程调用,不断地从流式通道中读出数据
* // (1) 从流式通道中读出数据,如果读出错,则关闭face
* // 2 如果读到数据,则调用onReceive尝试处理接收到的数据
* // (3) 如果数据处理出错, 则关闭face
* @return
*/
@Override
public LpPacket receive() throws LogicFaceException {
try {
while (true) {
LpPacket lpPacket = this.receive();
if (lpPacket != null) {
return lpPacket;
}
int recvRet = this.channel.read(ByteBuffer.wrap(ByteHelper.
getLenBytes(this.recvBuf, (int) this.recvLen, (int) (this.recvBuf.length - this.recvLen))));
if(recvRet<0){
return null;
}
this.recvLen+=recvRet;
}
} catch (IOException e) {
throw new LogicFaceException("StreamTransport.receive: "+e.getMessage());
}
}
/**
* 设置读超时时间
* @param duration 超时时间 , 以 毫秒 为单位, 小于等于0,表示永不超时
* @return
*/
// @Override
// public boolean setReadTimeout(long duration) throws LogicFaceException {
// try {
// if (duration <= 0) {
// this.channel.setSoTimeout(0);
// return true;
// }
// this.channel.socket().setSoTimeout((int)(duration));
// return false;
// } catch (SocketException e) {
// throw new LogicFaceException("StreamTransport.setReadTimeout"+e.getMessage());
// }
// }
}