writing UdpTransport&StreamTransport ...

This commit is contained in:
free will
2021-04-16 13:28:03 +08:00
parent 6ff8e7a47d
commit 31ce05ea71
4 changed files with 251 additions and 39 deletions
+3 -3
View File
@@ -20,13 +20,13 @@ public interface ITransport{
* @param lpPacket
* @return
*/
boolean send(LpPacket lpPacket);
boolean send(LpPacket lpPacket) throws LogicFaceException;
/**
* 从网络中接收到一段数据
* @return
*/
LpPacket receive();
LpPacket receive() throws LogicFaceException;
/** 获得Transport的对端地址
* // 格式
@@ -76,5 +76,5 @@ public interface ITransport{
* @param duration 超时时间 , 以 毫秒 为单位, 小于等于0,表示永不超时
* @return
*/
boolean setReadTimeout(long duration);
boolean setReadTimeout(long duration) throws LogicFaceException;
}
+35 -26
View File
@@ -95,14 +95,19 @@ public class LinkService {
* @param fragmentSeq 第几块分片,从0开始
* @return
*/
private boolean sendFragment(byte[] buf,int bufLen,long fragmentId,long fragmentNum, long fragmentSeq){
LpPacket lpPacket=new LpPacket();
lpPacket.setId(fragmentId);
lpPacket.setFragmentNum(fragmentNum);
lpPacket.setFragmentSeq(fragmentSeq);
byte[] nbuf=ByteHelper.getLenBytes(buf,0,bufLen);
lpPacket.setValue(nbuf);
return this.transport.send(lpPacket);
private boolean sendFragment(byte[] buf,int bufLen,long fragmentId,
long fragmentNum, long fragmentSeq) throws LogicFaceException {
try {
LpPacket lpPacket=new LpPacket();
lpPacket.setId(fragmentId);
lpPacket.setFragmentNum(fragmentNum);
lpPacket.setFragmentSeq(fragmentSeq);
byte[] nbuf=ByteHelper.getLenBytes(buf,0,bufLen);
lpPacket.setValue(nbuf);
return this.transport.send(lpPacket);
} catch (LogicFaceException e) {
throw new LogicFaceException("LinkService.sendFragment: "+e.getMessage());
}
}
/**
@@ -111,27 +116,31 @@ public class LinkService {
* @param bufLen
* @return
*/
private boolean sendByteBuffer(byte[] buf,int bufLen){
int fragmentLen=this.mtu-this.lpPacketHeadSize-10;
int startIndex=0;
int fragmentSeq=0;
int fragmentNum=bufLen/fragmentLen;
if(bufLen%fragmentLen!=0){
fragmentNum++;
}
while (startIndex<bufLen){
if(fragmentLen>(bufLen-startIndex)){
fragmentLen=bufLen-startIndex;
private boolean sendByteBuffer(byte[] buf,int bufLen) throws LogicFaceException {
try {
int fragmentLen = this.mtu - this.lpPacketHeadSize - 10;
int startIndex = 0;
int fragmentSeq = 0;
int fragmentNum = bufLen / fragmentLen;
if (bufLen % fragmentLen != 0) {
fragmentNum++;
}
byte[] nbuf= ByteHelper.getLenBytes(buf,startIndex,fragmentLen);
if(!this.sendFragment(nbuf,fragmentLen,this.lpPacketId,fragmentNum,fragmentSeq)){
return false;
while (startIndex < bufLen) {
if (fragmentLen > (bufLen - startIndex)) {
fragmentLen = bufLen - startIndex;
}
byte[] nbuf = ByteHelper.getLenBytes(buf, startIndex, fragmentLen);
if (!this.sendFragment(nbuf, fragmentLen, this.lpPacketId, fragmentNum, fragmentSeq)) {
return false;
}
startIndex += fragmentLen;
fragmentSeq++;
}
startIndex+=fragmentLen;
fragmentSeq++;
this.lpPacketId++;
return true;
} catch (LogicFaceException e) {
throw new LogicFaceException("LinkService.sendByteBuffer: "+e.getMessage());
}
this.lpPacketId++;
return true;
}
/**
+130 -4
View File
@@ -1,6 +1,18 @@
package logicface;
import encoding.TLV;
import encoding.TLVException;
import encoding.VlInt;
import encoding.VlIntException;
import javafx.util.Pair;
import packet.LpPacket;
import util.ByteHelper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Map;
/*
* @Author: Wang Feng
@@ -10,13 +22,127 @@ import packet.LpPacket;
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
public class StreamTransport extends Transport implements ITransport{
@Override
public boolean send(LpPacket lpPacket) {
return false;
}
public SocketChannel channel;
// 数据接收缓冲区
public byte[] recvBuf;
// 当前数据接收缓冲区中的有效数据的长度
public long recvLen;
@Override
public void close(){
try {
this.channel.close();
} catch (IOException e) {
// todo: 日志记录 e.printStackTrace();
}
}
/**
* 将lpPacket对象编码成字节数组后,通过流式通道发送出去
* @param lpPacket
* @return
*/
@Override
public boolean send(LpPacket lpPacket) throws LogicFaceException {
try {
byte[] encodeBuf=this.encodeLpPacket2ByteArray(lpPacket);
if(encodeBuf==null){
return false;
}
int writeLen=0;
while (writeLen< encodeBuf.length){
int writeRet=this.channel.write(ByteBuffer.wrap(encodeBuf));
if(writeRet<0){
this.linkService.logicFace.shutDown();
return false;
}
writeLen+=writeRet;
}
return true;
} catch (LogicFaceException | IOException e) {
throw new LogicFaceException("StreamTransport.send: "+e.getMessage());
}
}
/**
* 从接收缓冲区中读取包并调用linkService的ReceivePacket函数处理包
* // 每次收到数据时会调用这个函数从数据接收缓冲区中尝试读出一个LpPacket包。
* // 工作流程:
* // (1) 首先,如果数据缓冲区中收到的字节数不足以构成了一个TLV的Type字段,则返回 nil,0 表示还需要等待数据接收
* // (2 如果解析出来的Type值不等于TlvLpPacket, 表示接收的数据出错了,需要提示调用者关闭Face
* // (3) 如果接收到的数据还小于一个LpPacket的最小长度(不含负载的只含头部的长度), 表示还需要等待数据接收
* // 4 如果长度足够,TLV的Length部分,并计算一个LpPacket的总长度 totalPktLen
* // (5) 如果接收到的数据长度小于 totalPktLen , 则还需要等待后面数据接收
* // 6 从[]byte中解析出LpPacket,并调用linkService.ReceivePacket(lpPacket) 处理一个完整的LpPacket包
* @param buf
* @param bufLen
* @return
*/
private Pair<Long,LpPacket> readPktAndDeal(byte[] buf, long bufLen) throws LogicFaceException {
try {
// 如果接收到的数据长度小于 LpPacket type 字段的长度 3字节 则要等待
if(bufLen< (new VlInt(TLV.TlvLpPacket).size())){
return new Pair<Long, LpPacket>(0L,null);
}
VlInt pktType=TLV.readType(buf,new VlInt(0));
// 如果数据类型的 TLV 和 type值不等于 encoding.TlvLpPacket 则接收出错,应该关闭当前logicFace
if(pktType.getVlIntValue2Int()!=TLV.TlvLpPacket){
return new Pair<Long, LpPacket>(0L,null);
}
// 如果接收到的数据长度小于 LpPacket 的小于长度 则要等待
if(bufLen<this.linkService.lpPacketHeadSize){
return new Pair<Long, LpPacket>(0L,null);
}
int pktTypeLen=pktType.size();
int pktLen=TLV.readVarNumber(buf,new VlInt(pktTypeLen)).getVlIntValue2Int();
int totalPktLen=pktTypeLen+new VlInt(pktLen).getVlIntValue2Int()+pktLen;
if(bufLen>=totalPktLen){
LpPacket lpPacket=this.parseByteArray2LpPacket(
ByteHelper.getLenBytes(buf,0,totalPktLen));
return new Pair<Long, LpPacket>((long) totalPktLen,lpPacket);
}
return new Pair<Long, LpPacket>(0L,null);
} catch (TLVException | VlIntException e) {
throw new LogicFaceException("StreamTransport.readPktAndDeal: "+e.getMessage());
}
}
/**
* 接收到数据后,处理包
* // 1 调用 readPktAndDeal ,传入当前接收的到数据的[]byte,以及接收到的数据长度
* // 2 如果readPktAndDeal 返回错误,则将错误抛给上层调用者
* // 3 如果readPktAndDeal没返回错误,且返回的已经被处理的LpPacket长度pktLen大于0, 则循环做以下操作
* // a 统计已经处理的数据长度dealLen(等于每次处理包长度的总和),如果已经处理的长度小接收数据长度t.recvLen,
* // 而且 readPktAndDeal返回的错误为nil,且返回的pktLen > 0 再次调用readPktAndDeal去处理数据
* // b 如果循环中readPktAndDeal返回的错误不为nil,则终止循环,并将错误报给调用者
* // (4) 如果统计到的总处理长度 dealLen 大小0, 则将已经处理的数据从数据接收缓冲区中删除。删除的方法是将recvBuf[dealLen:t.recvLen]
* // 移到 t.recvBuf[:] , 即将未处理的数据移到接收缓冲区开关,并将接收数据长度t.recvLen 送去 已经处理的长度 dealLen
* @return
*/
private LpPacket doReceive() throws LogicFaceException {
Pair<Long,LpPacket> packetPair=this.readPktAndDeal(
ByteHelper.getLenBytes(this.recvBuf,0, (int) this.recvLen),this.recvLen);
long pktLen=packetPair.getKey();
LpPacket lpPacket=packetPair.getValue();
// 循环多次尝试从接收缓冲区中读出包并处理
if(pktLen>0){
//todo: 啥玩意儿???
}
return lpPacket;
}
/**
* 用协程调用,不断地从流式通道中读出数据
* // (1) 从流式通道中读出数据,如果读出错,则关闭face
* // 2 如果读到数据,则调用onReceive尝试处理接收到的数据
* // (3) 如果数据处理出错, 则关闭face
* @return
*/
@Override
public LpPacket receive() {
while (true){
break;
}
return null;
}
+83 -6
View File
@@ -4,6 +4,11 @@ import packet.LpPacket;
import java.net.*;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.sql.Time;
/*
* @Author: Wang Feng
@@ -13,19 +18,91 @@ import java.io.*;
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
public class UdpTransport extends Transport implements ITransport{
// public DatagramSocket datagramSocket;
// UDP句柄 (channel比socket性能更好)
public DatagramChannel channel;
// 接收缓冲区,大小为 9000
public byte[] recvBuf;
// 对端UDP地址,用于发送UDP包
public SocketAddress remoteUdpAddr;
/**
* 用于接收数据的transport的初始化函数
* @param channel
*/
public void init(DatagramChannel channel) throws LogicFaceException {
try {
this.channel=channel;
this.localAddr=channel.getLocalAddress().toString();
this.localUri="udp://"+this.localAddr;
this.remoteAddr=channel.getRemoteAddress().toString();
this.remoteUri="udp://"+this.remoteAddr;
this.recvBuf=new byte[9000];
} catch (IOException e) {
throw new LogicFaceException("UdpTransport.init: "+e.getMessage());
}
}
/**
* 发送一个lpPacket
* @param lpPacket
* @return
*/
@Override
public boolean send(LpPacket lpPacket) {
return false;
public boolean send(LpPacket lpPacket) throws LogicFaceException {
try {
byte[] encodeBuf=this.encodeLpPacket2ByteArray(lpPacket);
if(encodeBuf.length<=0){
return false;
}
this.channel.socket().connect(remoteUdpAddr);
int res=this.channel.write(ByteBuffer.wrap(encodeBuf));
if(res<0){
return false;
}
return true;
} catch (LogicFaceException | IOException e) {
throw new LogicFaceException("UdpTransport.send: "+e.getMessage());
}
}
/**
* 从UDP句柄中接收到UDP包,并返回给上层
* @return
*/
private LpPacket doReceive() throws LogicFaceException {
try {
SocketAddress remoteUdpAddr=this.channel.receive(ByteBuffer.wrap(this.recvBuf));
if(remoteUdpAddr==null){
return null;
}
//todo: 日志记录addr
LpPacket lpPacket=this.parseByteArray2LpPacket(this.recvBuf);
if(lpPacket==null){
return null;
}
return lpPacket;
} catch (IOException e) {
throw new LogicFaceException("UdpTransport.doReceive: "+e.getMessage());
}
}
@Override
public LpPacket receive() {
return null;
public LpPacket receive() throws LogicFaceException {
return this.doReceive();
}
@Override
public boolean setReadTimeout(long duration) {
return false;
public boolean setReadTimeout(long duration) throws LogicFaceException {
try {
if (duration <= 0) {
this.channel.socket().setSoTimeout(0);
return true;
}
this.channel.socket().setSoTimeout((int)(duration));
return false;
} catch (SocketException e) {
throw new LogicFaceException("UdpTransport.setReadTimeout"+e.getMessage());
}
}
}