Files
min-dev-java/src/main/java/logicface/UnixStreamTransport.java
T
2021-04-26 18:59:08 +08:00

197 lines
8.7 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package logicface;
import common.LoggerHelper;
import encoding.TLV;
import encoding.TLVException;
import encoding.VlInt;
import encoding.VlIntException;
import javafx.util.Pair;
import jnr.unixsocket.UnixSocketChannel;
import packet.LpPacket;
import util.ByteHelper;
import java.io.IOException;
import java.nio.ByteBuffer;
/*
* @Author: Wang Feng
* @Description: 这里与go版本的minlib的实现不同,这是因为java基础库中似乎没有实现unixsocket。
* 这里调用了maven的一个jnr-unixsocket库进行unix-transport的实现。
* 目前,不需要对该类进行测试,因为Java版本主要用于客户端,而客户端暂时没有unixsocket传输需求。
* @Version: 1.0.0
* @Date: 15:37 2021/4/16
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
public class UnixStreamTransport extends StreamTransport{
public UnixSocketChannel channel;
// 数据接收缓冲区
public byte[] recvBuf;
// 当前数据接收缓冲区中的有效数据的长度
public long recvLen;
/**
* 初始化UnixStreamTransport
* @param channel
*/
public void init(UnixSocketChannel channel) throws LogicFaceException {
this.channel=channel;
this.localAddr=channel.getLocalSocketAddress().toString();
this.localUri="unix://"+this.localAddr;
this.remoteAddr=channel.getRemoteSocketAddress().toString();
this.remoteUri="unix://"+this.remoteAddr;
this.recvBuf=new byte[1024*1028*4];
this.recvLen=0;
}
@Override
public void close(){
try {
this.channel.close();
} catch (IOException e) {
LoggerHelper.logger.debug(e.getMessage());
}
}
/**
* 将lpPacket对象编码成字节数组后,通过流式通道发送出去
* @param lpPacket
* @return
*/
@Override
public boolean send(LpPacket lpPacket) throws LogicFaceException {
try {
byte[] encodeBuf=this.encodeLpPacket2ByteArray(lpPacket);
if(encodeBuf==null){
return false;
}
int writeLen=0;
while (writeLen< encodeBuf.length){
int writeRet=this.channel.write(ByteBuffer.wrap(encodeBuf));
if(writeRet<0){
this.linkService.logicFace.shutDown();
return false;
}
writeLen+=writeRet;
}
return true;
} catch (LogicFaceException | IOException e) {
throw new LogicFaceException("StreamTransport.send: "+e.getMessage());
}
}
/**
* 从接收缓冲区中读取包并调用linkService的ReceivePacket函数处理包
* // 每次收到数据时会调用这个函数从数据接收缓冲区中尝试读出一个LpPacket包。
* // 工作流程:
* // (1) 首先,如果数据缓冲区中收到的字节数不足以构成了一个TLV的Type字段,则返回 nil,0 表示还需要等待数据接收
* // (2 如果解析出来的Type值不等于TlvLpPacket, 表示接收的数据出错了,需要提示调用者关闭Face
* // (3) 如果接收到的数据还小于一个LpPacket的最小长度(不含负载的只含头部的长度), 表示还需要等待数据接收
* // 4 如果长度足够,TLV的Length部分,并计算一个LpPacket的总长度 totalPktLen
* // (5) 如果接收到的数据长度小于 totalPktLen , 则还需要等待后面数据接收
* // 6 从[]byte中解析出LpPacket,并调用linkService.ReceivePacket(lpPacket) 处理一个完整的LpPacket包
* @param buf
* @param bufLen
* @return
*/
private Pair<Long,LpPacket> readPktAndDeal(byte[] buf, long bufLen) throws LogicFaceException {
try {
// 如果接收到的数据长度小于 LpPacket type 字段的长度 3字节 则要等待
if(bufLen< (new VlInt(TLV.TlvLpPacket).size())){
return new Pair<Long, LpPacket>(0L,null);
}
VlInt pktType=TLV.readType(buf,new VlInt(0));
// 如果数据类型的 TLV 和 type值不等于 encoding.TlvLpPacket 则接收出错,应该关闭当前logicFace
if(pktType.getVlIntValue2Int()!=TLV.TlvLpPacket){
return new Pair<Long, LpPacket>(0L,null);
}
// 如果接收到的数据长度小于 LpPacket 的小于长度 则要等待
if(bufLen<this.linkService.lpPacketHeadSize){
return new Pair<Long, LpPacket>(0L,null);
}
int pktTypeLen=pktType.size();
int pktLen=TLV.readVarNumber(buf,new VlInt(pktTypeLen)).getVlIntValue2Int();
int totalPktLen=pktTypeLen+new VlInt(pktLen).getVlIntValue2Int()+pktLen;
if(bufLen>=totalPktLen){
LpPacket lpPacket=this.parseByteArray2LpPacket(
ByteHelper.getLenBytes(buf,0,totalPktLen));
return new Pair<Long, LpPacket>((long) totalPktLen,lpPacket);
}
return new Pair<Long, LpPacket>(0L,null);
} catch (TLVException | VlIntException e) {
throw new LogicFaceException("StreamTransport.readPktAndDeal: "+e.getMessage());
}
}
/**
* 接收到数据后,处理包
* // 1 调用 readPktAndDeal ,传入当前接收的到数据的[]byte,以及接收到的数据长度
* // 2 如果readPktAndDeal 返回错误,则将错误抛给上层调用者
* // 3 如果readPktAndDeal没返回错误,且返回的已经被处理的LpPacket长度pktLen大于0, 则循环做以下操作
* // a 统计已经处理的数据长度dealLen(等于每次处理包长度的总和),如果已经处理的长度小接收数据长度t.recvLen,
* // 而且 readPktAndDeal返回的错误为nil,且返回的pktLen > 0 再次调用readPktAndDeal去处理数据
* // b 如果循环中readPktAndDeal返回的错误不为nil,则终止循环,并将错误报给调用者
* // (4) 如果统计到的总处理长度 dealLen 大小0, 则将已经处理的数据从数据接收缓冲区中删除。删除的方法是将recvBuf[dealLen:t.recvLen]
* // 移到 t.recvBuf[:] , 即将未处理的数据移到接收缓冲区开关,并将接收数据长度t.recvLen 送去 已经处理的长度 dealLen
* @return
*/
private LpPacket doReceive() throws LogicFaceException {
Pair<Long,LpPacket> packetPair=this.readPktAndDeal(
ByteHelper.getLenBytes(this.recvBuf,0, (int) this.recvLen),this.recvLen);
long pktLen=packetPair.getKey();
LpPacket lpPacket=packetPair.getValue();
// 循环多次尝试从接收缓冲区中读出包并处理
if(pktLen>0){
System.arraycopy(this.recvBuf,(int)pktLen,
this.recvBuf,0,(int)(this.recvLen-pktLen));
this.recvLen-=pktLen;
}
return lpPacket;
}
/**
* 用协程调用,不断地从流式通道中读出数据
* // (1) 从流式通道中读出数据,如果读出错,则关闭face
* // 2 如果读到数据,则调用onReceive尝试处理接收到的数据
* // (3) 如果数据处理出错, 则关闭face
* @return
*/
@Override
public LpPacket receive() throws LogicFaceException {
try {
while (true) {
LpPacket lpPacket = this.receive();
if (lpPacket != null) {
return lpPacket;
}
int recvRet = this.channel.read(ByteBuffer.wrap(ByteHelper.
getLenBytes(this.recvBuf, (int) this.recvLen, (int) (this.recvBuf.length - this.recvLen))));
if(recvRet<0){
return null;
}
this.recvLen+=recvRet;
}
} catch (IOException e) {
throw new LogicFaceException("StreamTransport.receive: "+e.getMessage());
}
}
/**
* 设置读超时时间
* @param duration 超时时间 , 以 毫秒 为单位, 小于等于0,表示永不超时
* @return
*/
// @Override
// public boolean setReadTimeout(long duration) throws LogicFaceException {
// try {
// if (duration <= 0) {
// this.channel.setSoTimeout(0);
// return true;
// }
// this.channel.socket().setSoTimeout((int)(duration));
// return false;
// } catch (SocketException e) {
// throw new LogicFaceException("StreamTransport.setReadTimeout"+e.getMessage());
// }
// }
}