add test: TcpTransport

This commit is contained in:
free will
2021-05-10 16:24:20 +08:00
parent 6393c7985e
commit cfcd4cbe35
2 changed files with 124 additions and 5 deletions
+12 -5
View File
@@ -13,6 +13,7 @@ import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@@ -98,7 +99,7 @@ public class StreamTransport extends Transport implements ITransport{
}
int pktTypeLen=pktType.size();
int pktLen=TLV.readVarNumber(buf,new VlInt(pktTypeLen)).getVlIntValue2Int();
int totalPktLen=pktTypeLen+new VlInt(pktLen).getVlIntValue2Int()+pktLen;
int totalPktLen=pktTypeLen+new VlInt(pktLen).size()+pktLen;
if(bufLen>=totalPktLen){
LpPacket lpPacket=this.parseByteArray2LpPacket(
ByteHelper.getLenBytes(buf,0,totalPktLen));
@@ -123,6 +124,7 @@ public class StreamTransport extends Transport implements ITransport{
* @return
*/
private LpPacket doReceive() throws LogicFaceException {
// System.out.println("doReceive param: "+ Arrays.toString(ByteHelper.getLenBytes(this.recvBuf,0, (int) this.recvLen)));
Pair<Long,LpPacket> packetPair=this.readPktAndDeal(
ByteHelper.getLenBytes(this.recvBuf,0, (int) this.recvLen),this.recvLen);
long pktLen=packetPair.getKey();
@@ -147,16 +149,21 @@ public class StreamTransport extends Transport implements ITransport{
public LpPacket receive() throws LogicFaceException {
try {
while (true) {
LpPacket lpPacket = this.receive();
LpPacket lpPacket = this.doReceive();
if (lpPacket != null) {
return lpPacket;
}
int recvRet = this.channel.read(ByteBuffer.wrap(ByteHelper.
getLenBytes(this.recvBuf, (int) this.recvLen, (int) (this.recvBuf.length - this.recvLen))));
// 从channel中接收数据
byte[] cacheBytes=new byte[(int) (this.recvBuf.length - this.recvLen)];
int recvRet = this.channel.read(ByteBuffer.wrap(cacheBytes));
// System.out.println("receive size: "+recvRet);
if(recvRet<0){
return null;
}
this.recvLen+=recvRet;
// System.out.println("receive res:"+Arrays.toString(
// ByteHelper.getLenBytes(cacheBytes,0,recvRet)));
System.arraycopy(cacheBytes,0,this.recvBuf,(int) this.recvLen,recvRet);
this.recvLen += recvRet;
}
} catch (IOException e) {
throw new LogicFaceException("StreamTransport.receive: "+e.getMessage());
@@ -0,0 +1,112 @@
package logicface;
import component.ComponentException;
import encoding.BlockException;
import encoding.EncoderException;
import encoding.SelfEncodingBase;
import mgmt.MgmtException;
import org.junit.Test;
import packet.LpPacket;
import packet.PacketException;
import util.ByteHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
/*
* @Author: Wang Feng
* @Description:
* @Version: 1.0.0
* @Date: 10:38 2021/5/7
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
public class TcpTransportTest {
@Test
public void testSend() throws IOException, LogicFaceException, EncoderException, BlockException, MgmtException, ComponentException, PacketException {
// 初始化一个tcp连接
TcpTransport tcpTransport = new TcpTransport();
SocketChannel socketChannel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 60000);
socketChannel.connect(socketAddress);
tcpTransport.init(socketChannel);
// 发送数据包
LpPacket lpPacket = new LpPacket();
lpPacket.setId(1);
lpPacket.setFragmentNum(1);
lpPacket.setFragmentSeq(0);
lpPacket.setValue(new byte[]{4, 5, 6, 7});
tcpTransport.send(lpPacket);
System.out.println("send: " +
Arrays.toString(new SelfEncodingBase().selfWireEncode(lpPacket).getRaw()));
}
@Test
public void testReceive() throws IOException, LogicFaceException, EncoderException, BlockException, MgmtException, ComponentException, PacketException {
// 初始化一个tcp连接
TcpTransport tcpTransport = new TcpTransport();
SocketChannel socketChannel = SocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 60000);
socketChannel.connect(socketAddress);
tcpTransport.init(socketChannel);
// 发送数据包
LpPacket lpPacket = new LpPacket();
lpPacket.setId(1);
lpPacket.setFragmentNum(1);
lpPacket.setFragmentSeq(0);
lpPacket.setValue(new byte[]{4, 5, 6, 7});
tcpTransport.send(lpPacket);
System.out.println("send: " +
Arrays.toString(new SelfEncodingBase().selfWireEncode(lpPacket).getRaw()));
// 接收数据包
LpPacket newLp = tcpTransport.receive();
if (newLp == null) {
System.out.println("lp null");
}
System.out.println("receive: " +
Arrays.toString(new SelfEncodingBase().selfWireEncode(newLp).getRaw()));
}
/**
* 开启一个tcp服务器,端口号为60000
*/
@Test
public void startTCPServer() {
ServerSocket socket = null;
OutputStream outputStream = null;
try {
//建立基站
socket = new ServerSocket(60000);
//开始开启接收模式,接到后返回客户端的socket对象
Socket client = socket.accept();
byte[] bytes = new byte[1500];
// 循环接收数据,并打印
InputStream inputStream = client.getInputStream();
int len = inputStream.read(bytes);
// inputStream.close();
System.out.println(Arrays.toString(ByteHelper.getLenBytes(bytes, 0, len)));
//获取向客户端发送消息的对象流
outputStream = client.getOutputStream();
//向客户端写数据
outputStream.write(ByteHelper.getLenBytes(bytes, 0, len));
inputStream.close();
outputStream.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}