Netty入门解决TCP粘包/分包的实例
Posted SCLM安全团队
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门解决TCP粘包/分包的实例相关的知识,希望对你有一定的参考价值。
回顾TCP粘包/分包问题的解决方法
1.消息定长
2.在包尾都增加特殊字符进行分割
3.将消息分为消息头和消息体
针对这三种方法,下面我会分别举例验证
FixedLengthFrameDecoder类
对应第一种解决方法:消息定长
(1)例1:服务端代码:
public class Server4 {
public static void main(String[] args) throws SigarException { //boss线程监听端口,worker线程负责数据读写
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(); try{ //辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap(); //设置线程池
bootstrap.group(boss,worker); //设置socket工厂
bootstrap.channel(NioserverSocketChannel.class); //设置管道工厂
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { //获取管道
ChannelPipeline pipeline = socketChannel.pipeline(); //定长解码类
pipeline.addLast(new FixedLengthFrameDecoder(19)); //字符串解码类
pipeline.addLast(new StringDecoder()); //处理类
pipeline.addLast(new ServerHandler4());
}
}); //绑定端口
ChannelFuture future = bootstrap.bind(8866).sync();
System.out.println("server start ...... "); //等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally { //优雅退出,释放线程池资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class ServerHandler4 extends SimpleChannelInboundHandler <String>{ //用于记录次数
private int count = 0; //读取客户端发送的数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("RESPONSE--------"+msg+";"+" @ "+ ++count);
} //新客户端接入
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
} //客户端断开
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
} //异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道
ctx.channel().close(); //打印异常
cause.printStackTrace();
}
}
(2)例1:客户端代码:
public class Client4 {
public static void main(String[] args) { //worker负责读写数据
EventLoopGroup worker = new NioEventLoopGroup(); try { //辅助启动类
Bootstrap bootstrap = new Bootstrap(); //设置线程池
bootstrap.group(worker); //设置socket工厂
bootstrap.channel(NioSocketChannel.class); //设置管道
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { //获取管道
ChannelPipeline pipeline = socketChannel.pipeline(); //定长解码类
pipeline.addLast(new FixedLengthFrameDecoder(19)); //字符串编码类
pipeline.addLast(new StringEncoder()); //处理类
pipeline.addLast(new ClientHandler4());
}
}); //发起异步连接操作
ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync(); //等待客户端链路关闭
futrue.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally { //优雅的退出,释放NIO线程组
worker.shutdownGracefully();
}
}
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> { //接受服务端发来的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server response : "+msg);
} //与服务器建立连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { //给服务器发消息
String s = System.getProperty("line.separator"); //发送50次消息
for (int i = 0; i < 50; i++) {
ctx.channel().writeAndFlush(" I am client "+s);
}
} //与服务器断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
} //异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭管道
ctx.channel().close(); //打印异常信息
cause.printStackTrace();
}
}
例1服务端运行结果:
…………………….此处省略多行………………….
分析:从运行结果可以看出,符合我们的预期,并没有TCP粘包问题,这是因为使用的定长解码器的原因,我在此解释一下例1client/server代码中新增加了几个“陌生”的类,若之后再次出现,则不作解释!
FixedLengthFrameDecoder类:用于固定长度消息的粘包分包处理,可以携带参数,我在代码中指定的参数为19,因为我要发送的字符长度为19。
StringDecoder类 :用于字符串的解码。
StringEncoder类 :用于字符串的编码。
LineBasedFrameDecoder类
对应第二种解决方法:在包尾都增加特殊字符(行分隔符)进行分割
(1)例2:服务端代码:
public class Server4 {
public static void main(String[] args) throws SigarException { //boss线程监听端口,worker线程负责数据读写
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(); try{ //辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap(); //设置线程池
bootstrap.group(boss,worker); //设置socket工厂
bootstrap.channel(NioServerSocketChannel.class); //设置管道工厂
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { //获取管道
ChannelPipeline pipeline = socketChannel.pipeline(); //行分隔符解码类
pipeline.addLast(new LineBasedFrameDecoder(1024)); //字符串解码类
pipeline.addLast(new StringDecoder()); //处理类
pipeline.addLast(new ServerHandler4());
}
}); //绑定端口
ChannelFuture future = bootstrap.bind(8866).sync();
System.out.println("server start ...... "); //等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally { //优雅退出,释放线程池资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class ServerHandler4 extends SimpleChannelInboundHandler <String>{ //用于记录次数
private int count = 0; //读取客户端发送的数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("RESPONSE--------"+msg+";"+" @ "+ ++count);
} //新客户端接入
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
} //客户端断开
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
} //异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道
ctx.channel().close(); //打印异常
cause.printStackTrace();
}
}
(2)例2:客户端代码:
public class Client4 {
public static void main(String[] args) { //worker负责读写数据
EventLoopGroup worker = new NioEventLoopGroup(); try { //辅助启动类
Bootstrap bootstrap = new Bootstrap(); //设置线程池
bootstrap.group(worker); //设置socket工厂
bootstrap.channel(NioSocketChannel.class); //设置管道
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { //获取管道
ChannelPipeline pipeline = socketChannel.pipeline(); //行分隔符解码类
pipeline.addLast(new LineBasedFrameDecoder(1024)); //字符串编码类
pipeline.addLast(new StringEncoder()); //处理类
pipeline.addLast(new ClientHandler4());
}
}); //发起异步连接操作
ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync(); //等待客户端链路关闭
futrue.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally { //优雅的退出,释放NIO线程组
worker.shutdownGracefully();
}
}
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> { //接受服务端发来的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server response : "+msg);
} //与服务器建立连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { //给服务器发消息
String s = System.getProperty("line.separator"); //发送50次消息
for (int i = 0; i < 50; i++) {
ctx.channel().writeAndFlush(" I am client "+s);
}
} //与服务器断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
} //异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭管道
ctx.channel().close(); //打印异常信息
cause.printStackTrace();
}
}
例2服务端运行结果:
…………………….此处省略多行………………….
分析:从运行结果可以看出没有TCP粘包问题了,细心的你或许已经发现代码中新出现了一个LineBasedFrameDecoder类,它可以携带参数,我指定的参数为1024,含义为在每1024个字节中寻找换行符,若有,就发送消息,否则继续寻找。
DelimiterBasedFrameDecoder类
对应第二种解决方法:在包尾都增加特殊字符(#)进行分割
例3:服务端代码,和例2服务端代码类似,由于篇幅有限,我就仅仅指出它们不一样的地方了!
将例2服务端 第23行 和第24行 代码修改为
//自定义分隔符解码类pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
1
2
例3:客户端代码:(和例2客户端代码的不同之处)
将例2客户端 第24行 和第25行 代码修改为
//自定义分隔符解码类pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
1
2
再将例2客户端 第63行 代码修改为
ctx.channel().writeAndFlush(" I am client "+"#");
1
例3服务端运行结果:
…………………….此处省略多行………………….
分析:从运行结果可以看出TCP粘包问题解决了!代码中新出现了一个DelimiterBasedFrameDecoder类,它可以携带参数,我指定的参数为(1024,Unpooled.copiedBuffer(“#”.getBytes()))),含义为在每1024个字节中寻找#,若有,就发送消息,否则继续寻找。
MyRequestDecoder自定义类
对应第三种方法:将消息分为消息头和消息体
对于消息头和消息体,有三种情况分别如下:
有头部的拆包与粘包:
lengthFieldOffset = 2 长度字段偏移量 ( = 外部头部Header 1的长度)
lengthFieldLength = 3 长度字段占用字节数
lengthAdjustment = 0
initialBytesToStrip = 0长度字段在前且有头部的拆包与粘包:
lengthFieldOffset = 0 长度字段偏移量
lengthFieldLength = 3 长度字段占用字节数
lengthAdjustment = 2 ( Header 1 的长度)
initialBytesToStrip = 0多扩展头部的拆包与粘包:
lengthFieldOffset = 1 长度字段偏移量(=头HDR1的长度)
lengthFieldLength = 2 长度字段占用字节数
lengthAdjustment = 1 调整长度(= 头HDR2的长度)
initialBytesToStrip = 3 排除的偏移量(= the length of HDR1 + LEN)举一个简单的例子
例4:
import netty.EnDecode.Request;/** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class MyRequestDecoder extends FrameDecoder{
//数据包基本长度
public static final int BASE_LENTH = 4 + 2 + 2 + 4; @Override
protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { //可读长度必须大于基本长度
if(buffer.readableBytes() >= BASE_LENTH){ //防止socket字节流攻击
if(buffer.readableBytes() > 2048){
buffer.skipBytes(buffer.readableBytes());
} //记录包头开始的index
int beginReader; while(true){
beginReader = buffer.readerIndex();
buffer.markReaderIndex(); if(buffer.readInt() == -32523523){ break;
} //未读到包头,略过一个字节
buffer.resetReaderIndex();
buffer.readByte(); //长度又变得不满足
if(buffer.readableBytes() < BASE_LENTH){ return null;
}
} //模块号
short module = buffer.readShort(); //命令号
short cmd = buffer.readShort(); //长度
int length = buffer.readInt(); //判断请求数据包数据是否到齐
if(buffer.readableBytes() < length){ //还原读指针
buffer.readerIndex(beginReader); return null;
} //读取data数据
byte[] data = new byte[length];
buffer.readBytes(data);
Request request = new Request();
request.setModule(module);
request.setCmd(cmd);
request.setData(data); //继续往下传递
return request;
} //数据包不完整,需要等待后面的包来
return null;
}
}
以上是关于Netty入门解决TCP粘包/分包的实例的主要内容,如果未能解决你的问题,请参考以下文章