fixed bug: face timeout 失效问题

This commit is contained in:
free will
2021-05-13 21:59:54 +08:00
parent 69d77d9574
commit cd610d7b5e
4 changed files with 79 additions and 53 deletions
+14 -5
View File
@@ -150,20 +150,29 @@ public class StreamTransport extends Transport implements ITransport{
if (lpPacket != null) {
return lpPacket;
}
// 从channel中接收数据
// 支持触发超时机制的数据读取。这是第二种写法,可以正常触发超时机制
byte[] cacheBytes=new byte[(int) (this.recvBuf.length - this.recvLen)];
int recvRet = this.channel.read(ByteBuffer.wrap(cacheBytes));
// System.out.println("receive size: "+recvRet);
int recvRet = this.channel.socket().getInputStream().read(cacheBytes);
if(recvRet<0){
return null;
}
System.arraycopy(cacheBytes,0,this.recvBuf,(int) this.recvLen,recvRet);
// // channel直接读取是无法触发超时机制的,所以注释掉这第一种写法
// 从channel中接收数据
// byte[] cacheBytes=new byte[(int) (this.recvBuf.length - this.recvLen)];
// int recvRet = this.channel.read(ByteBuffer.wrap(cacheBytes));
//// System.out.println("receive size: "+recvRet);
// if(recvRet<0){
// return null;
// }
// System.out.println("receive res:"+Arrays.toString(
// ByteHelper.getLenBytes(cacheBytes,0,recvRet)));
System.arraycopy(cacheBytes,0,this.recvBuf,(int) this.recvLen,recvRet);
// System.arraycopy(cacheBytes,0,this.recvBuf,(int) this.recvLen,recvRet);
this.recvLen += recvRet;
}
} catch (IOException e) {
throw new LogicFaceException("StreamTransport.receive: "+e.getMessage());
throw new LogicFaceException("StreamTransport.receive: "+e.getMessage()+" ; Cause: read timeout.");
}
}
+13 -4
View File
@@ -90,12 +90,21 @@ public class UdpTransport extends Transport implements ITransport{
*/
private LpPacket doReceive() throws LogicFaceException {
try {
ByteBuffer bf = ByteBuffer.wrap(this.recvBuf);
int readLen=this.channel.read(bf);
// 支持触发超时机制的数据读取。这是第二种写法,可以正常触发超时机制
DatagramPacket receivePacket=new DatagramPacket(this.recvBuf,this.recvBuf.length);
this.channel.socket().receive(receivePacket);
int readLen=receivePacket.getLength();
if(readLen<=0){
return null;
}
byte[] readBytes= ByteHelper.getLenBytes(bf.array(),0,readLen);
byte[] readBytes=ByteHelper.getLenBytes(this.recvBuf,0,readLen);
// // channel直接读取是无法触发超时机制的,所以注释掉这第一种写法
// ByteBuffer bf = ByteBuffer.wrap(this.recvBuf);
// int readLen=this.channel.read(bf);
// if(readLen<=0){
// return null;
// }
// byte[] readBytes= ByteHelper.getLenBytes(bf.array(),0,readLen);
LoggerHelper.logger.debug("recv from udp: "+this.remoteAddr);
LpPacket lpPacket=this.parseByteArray2LpPacket(readBytes);
if(lpPacket==null){
@@ -103,7 +112,7 @@ public class UdpTransport extends Transport implements ITransport{
}
return lpPacket;
} catch (IOException e) {
throw new LogicFaceException("UdpTransport.doReceive: "+e.getMessage());
throw new LogicFaceException("UdpTransport.doReceive: "+e.getMessage()+" ; Cause: read timeout.");
}
}
+47 -44
View File
@@ -106,6 +106,7 @@ public class LogicFaceTest {
}
}
// 在这里验证超时时不能接收到TCP包
@Test
public void testReceiveDataByTcp() throws ComponentException, LogicFaceException, EncoderException, BlockException, MgmtException, PacketException {
Data data=new Data();
@@ -121,7 +122,7 @@ public class LogicFaceTest {
// 等待两秒钟,接收数据包
try {
Data data1 = face.receiveData(2000);
Data data1 = face.receiveData(6000);
System.out.println(Arrays.toString(
new SelfEncodingBase().selfWireEncode(data1).getRaw()));
}catch (Exception e){
@@ -129,27 +130,27 @@ public class LogicFaceTest {
}
}
// todo: 在这里测试为何超时依然能接收到UDP包
// 在这里验证超时时不能接收到UDP包
@Test
public void testReceiveDataByUdp() throws ComponentException, LogicFaceException, EncoderException, BlockException, MgmtException, PacketException {
Data data=new Data();
byte[] value={(byte)132,(byte)221,(byte)223,(byte)25};
data.payload.setValue(value);
data.congestionMark.setCongestionLevel(Long.MAX_VALUE);
Identifier name=new Identifier("/wefree");
data.setName(name);
LogicFace face=new LogicFace();
face.initWithUdp("127.0.0.1",50000);
face.sendData(data);
// 等待两秒钟,接收数据包
public void testReceiveDataByUdp() {
try {
Data data1 = face.receiveData(2000);
Data data=new Data();
byte[] value={(byte)132,(byte)221,(byte)223,(byte)25};
data.payload.setValue(value);
data.congestionMark.setCongestionLevel(Long.MAX_VALUE);
Identifier name=new Identifier("/wefree");
data.setName(name);
LogicFace face=new LogicFace();
face.initWithUdp("127.0.0.1",50000);
face.sendData(data);
// 等待两秒钟,接收数据包
Data data1 = face.receiveData(4000);
System.out.println(Arrays.toString(
new SelfEncodingBase().selfWireEncode(data1).getRaw()));
}catch (Exception e){
e.printStackTrace();
}catch (LogicFaceException | ComponentException | EncoderException | BlockException | PacketException | MgmtException e){
System.out.println(e.getMessage());
}
}
@@ -204,39 +205,41 @@ public class LogicFaceTest {
/**
* 开启一个tcp服务器,端口号为60000
*/
// @Test
@Test
public void startTCPServer() {
ServerSocket socket = null;
OutputStream outputStream = null;
try {
//建立基站
socket = new ServerSocket(60000);
//开始开启接收模式,接到后返回客户端的socket对象
Socket client = socket.accept();
while(true) {
ServerSocket socket = null;
OutputStream outputStream = null;
try {
//建立基站
socket = new ServerSocket(60000);
//开始开启接收模式,接到后返回客户端的socket对象
Socket client = socket.accept();
byte[] bytes = new byte[1500];
byte[] bytes = new byte[1500];
// 循环接收数据,并打印
InputStream inputStream = client.getInputStream();
int len = inputStream.read(bytes);
// 接收数据,并打印
InputStream inputStream = client.getInputStream();
int len = inputStream.read(bytes);
// inputStream.close();
System.out.println(Arrays.toString(ByteHelper.getLenBytes(bytes, 0, len)));
System.out.println(Arrays.toString(ByteHelper.getLenBytes(bytes, 0, len)));
// 等待三秒,再发送应答
Thread.sleep(5000);
// 等待,再发送应答
Thread.sleep(5000);
//获取向客户端发送消息的对象流
outputStream = client.getOutputStream();
//向客户端写数据
outputStream.write(ByteHelper.getLenBytes(bytes, 0, len));
System.out.println("writed to client: "+Arrays.toString(ByteHelper.getLenBytes(bytes, 0, len)));
//获取向客户端发送消息的对象流
outputStream = client.getOutputStream();
//向客户端写数据
outputStream.write(ByteHelper.getLenBytes(bytes, 0, len));
System.out.println("writed to client: " + Arrays.toString(ByteHelper.getLenBytes(bytes, 0, len)));
Thread.sleep(200000);
inputStream.close();
outputStream.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(200000);
inputStream.close();
outputStream.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
+5
View File
@@ -32,8 +32,13 @@ public class UDPSendDemo {
// 另一种方式接收数据
byte[] receiveData=new byte[1000];
DatagramPacket newP=new DatagramPacket(receiveData,1000);
int len=channel.socket().getReceiveBufferSize();
channel.socket().receive(newP);
int length=newP.getLength();
System.out.println("len: "+len);
System.out.println("length: "+length);
System.out.println("收到应答DatagramPacket1: "+Arrays.toString(newP.getData()));
System.out.println("收到应答DatagramPacket1-byte[]: "+Arrays.toString(receiveData));
// 设置读写超时时间
channel.socket().setSoTimeout(2000);