Netty入门解决TCP粘包/分包的实例

Posted SCLM安全团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门解决TCP粘包/分包的实例相关的知识,希望对你有一定的参考价值。

回顾TCP粘包/分包问题的解决方法

1.消息定长

2.在包尾都增加特殊字符进行分割

3.将消息分为消息头和消息体

针对这三种方法,下面我会分别举例验证

FixedLengthFrameDecoder类

对应第一种解决方法:消息定长

(1)例1:服务端代码:

public class Server4 {
    public static void main(String[] args) throws SigarException {        //boss线程监听端口,worker线程负责数据读写
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();        try{            //辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();            //设置线程池
            bootstrap.group(boss,worker);            //设置socket工厂
            bootstrap.channel(NioserverSocketChannel.class);            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();                    //定长解码类
                    pipeline.addLast(new FixedLengthFrameDecoder(19));                    //字符串解码类
                    pipeline.addLast(new StringDecoder());                    //处理类
                    pipeline.addLast(new ServerHandler4());
                }
            });            //绑定端口
            ChannelFuture future = bootstrap.bind(8866).sync();
            System.out.println("server start ...... ");            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

class ServerHandler4 extends SimpleChannelInboundHandler <String>{    //用于记录次数
    private int count = 0;    //读取客户端发送的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("RESPONSE--------"+msg+";"+"   @ "+ ++count);

    }    //新客户端接入
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }    //客户端断开
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }    //异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //关闭通道
        ctx.channel().close();        //打印异常
        cause.printStackTrace();
    }
}
 
   
   
 

(2)例1:客户端代码:

public class Client4 {

    public static void main(String[] args) {        //worker负责读写数据
        EventLoopGroup worker = new NioEventLoopGroup();        try {            //辅助启动类
            Bootstrap bootstrap = new Bootstrap();            //设置线程池
            bootstrap.group(worker);            //设置socket工厂
            bootstrap.channel(NioSocketChannel.class);            //设置管道
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();                    //定长解码类
                    pipeline.addLast(new FixedLengthFrameDecoder(19));                    //字符串编码类
                    pipeline.addLast(new StringEncoder());                    //处理类
                    pipeline.addLast(new ClientHandler4());
                }
            });            //发起异步连接操作
            ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();            //等待客户端链路关闭
            futrue.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {            //优雅的退出,释放NIO线程组
            worker.shutdownGracefully();
        }
    }

}

class ClientHandler4 extends SimpleChannelInboundHandler<String> {    //接受服务端发来的消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server response : "+msg);
    }    //与服务器建立连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //给服务器发消息
        String s = System.getProperty("line.separator");        //发送50次消息
        for (int i = 0; i < 50; i++) {
            ctx.channel().writeAndFlush("  I am client    "+s);
        }
    }    //与服务器断开连接
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }    //异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //关闭管道
        ctx.channel().close();        //打印异常信息
        cause.printStackTrace();
    }

}
 
   
   
 

例1服务端运行结果:

…………………….此处省略多行…………………. 
【Netty入门】解决TCP粘包/分包的实例

分析:从运行结果可以看出,符合我们的预期,并没有TCP粘包问题,这是因为使用的定长解码器的原因,我在此解释一下例1client/server代码中新增加了几个“陌生”的类,若之后再次出现,则不作解释!

  • FixedLengthFrameDecoder类:用于固定长度消息的粘包分包处理,可以携带参数,我在代码中指定的参数为19,因为我要发送的字符长度为19。

  • StringDecoder类 :用于字符串的解码。

  • StringEncoder类 :用于字符串的编码。

LineBasedFrameDecoder类

对应第二种解决方法:在包尾都增加特殊字符(行分隔符)进行分割

(1)例2:服务端代码:

public class Server4 {
    public static void main(String[] args) throws SigarException {        //boss线程监听端口,worker线程负责数据读写
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();        try{            //辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();            //设置线程池
            bootstrap.group(boss,worker);            //设置socket工厂
            bootstrap.channel(NioServerSocketChannel.class);            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();                    //行分隔符解码类
                    pipeline.addLast(new LineBasedFrameDecoder(1024));                    //字符串解码类
                    pipeline.addLast(new StringDecoder());                    //处理类
                    pipeline.addLast(new ServerHandler4());
                }
            });            //绑定端口
            ChannelFuture future = bootstrap.bind(8866).sync();
            System.out.println("server start ...... ");            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

class ServerHandler4 extends SimpleChannelInboundHandler <String>{    //用于记录次数
    private int count = 0;    //读取客户端发送的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("RESPONSE--------"+msg+";"+"   @ "+ ++count);

    }    //新客户端接入
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }    //客户端断开
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }    //异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //关闭通道
        ctx.channel().close();        //打印异常
        cause.printStackTrace();
    }
}
 
   
   
 

(2)例2:客户端代码:

public class Client4 {

    public static void main(String[] args) {        //worker负责读写数据
        EventLoopGroup worker = new NioEventLoopGroup();        try {            //辅助启动类
            Bootstrap bootstrap = new Bootstrap();            //设置线程池
            bootstrap.group(worker);            //设置socket工厂
            bootstrap.channel(NioSocketChannel.class);            //设置管道
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();                    //行分隔符解码类
                    pipeline.addLast(new LineBasedFrameDecoder(1024));                    //字符串编码类
                    pipeline.addLast(new StringEncoder());                    //处理类
                    pipeline.addLast(new ClientHandler4());
                }
            });            //发起异步连接操作
            ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();            //等待客户端链路关闭
            futrue.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {            //优雅的退出,释放NIO线程组
            worker.shutdownGracefully();
        }
    }

}

class ClientHandler4 extends SimpleChannelInboundHandler<String> {    //接受服务端发来的消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server response : "+msg);
    }    //与服务器建立连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //给服务器发消息
        String s = System.getProperty("line.separator");        //发送50次消息
        for (int i = 0; i < 50; i++) {
            ctx.channel().writeAndFlush("  I am client    "+s);
        }
    }    //与服务器断开连接
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }    //异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //关闭管道
        ctx.channel().close();        //打印异常信息
        cause.printStackTrace();
    }

}
 
   
   
 

例2服务端运行结果:

【Netty入门】解决TCP粘包/分包的实例

…………………….此处省略多行…………………. 
【Netty入门】解决TCP粘包/分包的实例

分析:从运行结果可以看出没有TCP粘包问题了,细心的你或许已经发现代码中新出现了一个LineBasedFrameDecoder类,它可以携带参数,我指定的参数为1024,含义为在每1024个字节中寻找换行符,若有,就发送消息,否则继续寻找。

DelimiterBasedFrameDecoder类

对应第二种解决方法:在包尾都增加特殊字符(#)进行分割

例3:服务端代码,和例2服务端代码类似,由于篇幅有限,我就仅仅指出它们不一样的地方了!

将例2服务端 第23行 和第24行 代码修改为

//自定义分隔符解码类pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
 
   
   
 
  • 1

  • 2

例3:客户端代码:(和例2客户端代码的不同之处)

将例2客户端 第24行 和第25行 代码修改为

//自定义分隔符解码类pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));
 
   
   
 
  • 1

  • 2

再将例2客户端 第63行 代码修改为

ctx.channel().writeAndFlush("  I am  client    "+"#");
 
   
   
 
  • 1

例3服务端运行结果:

【Netty入门】解决TCP粘包/分包的实例

…………………….此处省略多行…………………. 
【Netty入门】解决TCP粘包/分包的实例

分析:从运行结果可以看出TCP粘包问题解决了!代码中新出现了一个DelimiterBasedFrameDecoder类,它可以携带参数,我指定的参数为(1024,Unpooled.copiedBuffer(“#”.getBytes()))),含义为在每1024个字节中寻找#,若有,就发送消息,否则继续寻找。

MyRequestDecoder自定义类

对应第三种方法:将消息分为消息头和消息体

对于消息头和消息体,有三种情况分别如下:

  • 有头部的拆包与粘包:

    lengthFieldOffset = 2 长度字段偏移量 ( = 外部头部Header 1的长度) 
    lengthFieldLength = 3 长度字段占用字节数 
    lengthAdjustment = 0 
    initialBytesToStrip = 0

  • 【Netty入门】解决TCP粘包/分包的实例

  • 长度字段在前且有头部的拆包与粘包:

    lengthFieldOffset = 0 长度字段偏移量 
    lengthFieldLength = 3 长度字段占用字节数 
    lengthAdjustment = 2 ( Header 1 的长度) 
    initialBytesToStrip = 0

  • 多扩展头部的拆包与粘包: 
    lengthFieldOffset = 1 长度字段偏移量(=头HDR1的长度) 
    lengthFieldLength = 2 长度字段占用字节数 
    lengthAdjustment = 1 调整长度(= 头HDR2的长度) 
    initialBytesToStrip = 3 排除的偏移量(= the length of HDR1 + LEN)


  • 举一个简单的例子

例4:

import netty.EnDecode.Request;/** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头   | 模块号    | 命令号 |  长度   |   数据   | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class MyRequestDecoder extends FrameDecoder{

    //数据包基本长度
    public static final int BASE_LENTH = 4 + 2 + 2 + 4;    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {        //可读长度必须大于基本长度
        if(buffer.readableBytes() >= BASE_LENTH){            //防止socket字节流攻击
            if(buffer.readableBytes() > 2048){
                buffer.skipBytes(buffer.readableBytes());
            }            //记录包头开始的index
            int beginReader;            while(true){
                beginReader = buffer.readerIndex();
                buffer.markReaderIndex();                if(buffer.readInt() == -32523523){                    break;
                }                //未读到包头,略过一个字节
                buffer.resetReaderIndex();
                buffer.readByte();                //长度又变得不满足
                if(buffer.readableBytes() < BASE_LENTH){                    return null;
                }
            }            //模块号
            short module = buffer.readShort();            //命令号
            short cmd = buffer.readShort();            //长度
            int length = buffer.readInt();            //判断请求数据包数据是否到齐
            if(buffer.readableBytes() < length){                //还原读指针
                buffer.readerIndex(beginReader);                return null;
            }            //读取data数据
            byte[] data = new byte[length];
            buffer.readBytes(data);

            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);            //继续往下传递 
            return request;

        }        //数据包不完整,需要等待后面的包来
        return null;
    }

}
 
   
   
 


以上是关于Netty入门解决TCP粘包/分包的实例的主要内容,如果未能解决你的问题,请参考以下文章

Netty4.xNetty TCP粘包/拆包问题的解决办法

12.netty中tcp粘包拆包问题及解决方法

12.netty中tcp粘包拆包问题及解决方法

详解啥是 TCP 粘包和拆包现象并演示 Netty 是如何解决的

十二.Netty入门到超神系列-TCP粘包拆包处理

Netty——解决TCP粘包、拆包