网络I/o编程模型21 netty的粘包和拆包问题的解决方案

Posted 健康平安的活着

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络I/o编程模型21 netty的粘包和拆包问题的解决方案相关的知识,希望对你有一定的参考价值。

一 问题背景描述

1.1 问题描述

tcp是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket;

客户端为了每次更有效的发送更多的数据给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块然后进行封包。

问题:

这样虽然效率提高了,但是接收端就难于分辨出完整的数据包了,tcp无消息保护边界,需要在接收端处理消息边界问题,也就是我们说的粘包,拆包问题。

1.2 粘包和拆包

假设有两个数据包D1和D2,由于服务端一次读取到直接数是不确定的,所以可能存在以下4种情况:
1.客户端分别发送两个独立的包,D1和D2,没有出现粘包和拆包。

2.服务端一次性接收到了连个数据数据包,D1和D2粘合在一起了,这就是粘包
3.服务端分两次读到了数据,第一次读取完整的D1和D2的一部分,第二次为D2的剩余的内容,这就是拆包。
3.服务端分两次读到了数据,第一次读取D1的一部分,第二次为D1的剩余部分和D2的完整数据,这就是拆包。


二  案例

2.1 客户端代码

1.客户端

public class NettyClient 
    public static void main(String[] args)  throws  Exception
        EventLoopGroup group = new NioEventLoopGroup();
        try 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NiosocketChannel.class)
                    .handler(new MyClientInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            group.shutdownGracefully();
        
    

2.初始化代码

public class MyClientInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new MyClientHandler());
    

3.handler处理

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> 
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        //使用客户端发送10条数据 hello,server 编号
        for(int i= 0; i< 10; ++i) 
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception 
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息=" + message);
        System.out.println("客户端接收消息数量=" + (++this.count));

    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

2.2 服务端代码 

1.服务端代码

public class NettyTcPNianServer 
    public static void main(String[] args) throws Exception

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try 

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        

    

2.初始化代码

public class MyServerInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception 
        ChannelPipeline pipeline =socketChannel.pipeline();
        pipeline.addLast(new MyServerHandler());
    

3.handler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> 
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception 

        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 " + message);
        System.out.println("服务器接收到消息量=" + (++this.count));
        //服务器回送数据给客户端, 回送一个随机id ,
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        //cause.printStackTrace();
        ctx.close();
    

2.3 调试

1.启动服务端

2.启动客户端

1.客户端

 2.客户端

三  粘包和拆包解决方案

3.1 解决思路

使用自定义协议+加编码解码,设定每次发送数据的长度,服务器读取数据的长度。避免多读或者少读,造成粘包或者拆包。

3.2 执行流程图

 3.3 代码

3.3.1 自定义协议

public class InfoProtocol 
    private int len; //关键
    private byte[] content;

    public int getLen() 
        return len;
    

    public void setLen(int len) 
        this.len = len;
    

    public byte[] getContent() 
        return content;
    

    public void setContent(byte[] content) 
        this.content = content;
    

3.3.2 解码

public class MyMessageDncoder extends ReplayingDecoder<Void> 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception 
        System.out.println(" step3: MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码-> MessageProtocol 数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
       InfoProtocol messageProtocol = new InfoProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    

3.3.3 编码

public class MyMessageEncoder extends MessageToByteEncoder<InfoProtocol> 
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, InfoProtocol infoProtocol, ByteBuf byteBufOut) throws Exception 
        System.out.println("step2: MyMessageEncoder encode 方法被调用");
        byteBufOut.writeInt(infoProtocol.getLen());
        byteBufOut.writeBytes(infoProtocol.getContent());
    

3.3.4 客户端

public class NettChaBaoClient 
    public static void main(String[] args)  throws  Exception
        EventLoopGroup group = new NioEventLoopGroup();
        try 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            group.shutdownGracefully();
        
    

3.3.5 客户端-初始化

public class MyClientInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception 

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageEncoder()); //加入编码器
        pipeline.addLast(new MyMessageDncoder()); //加入解码器
        pipeline.addLast(new MyClientHandler());
       // pipeline.addLast(new MyMessageEncoder()); //加入编码器
    

3.3.6  客户端-自定义handler

public class MyClientHandler  extends SimpleChannelInboundHandler<InfoProtocol> 
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        //使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
        for(int i = 0; i< 5; i++) 
            String mes = "北京又出现口罩事件了,去酒吧惹的祸!!!";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));
            int length = mes.getBytes(Charset.forName("utf-8")).length;
            System.out.println("===========step1: client 的handler:。。。。。。");
            //创建协议包对象
            InfoProtocol messageProtocol = new InfoProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);
        

    

    //    @Override
    protected void channelRead0(ChannelHandlerContext ctx, InfoProtocol msg) throws Exception 
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息如下");
        System.out.println("长度=" + len);
        System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
        System.out.println("客户端接收消息数量=" + (++this.count));

    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        System.out.println("异常消息=" + cause.getMessage());
        ctx.close();
    

3.3.7  服务端

public class NettyChaBaoServer 
    public static void main(String[] args) throws Exception
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
            ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    

3.3.8  服务端-初始化

public class MyServerInitializer extends ChannelInitializer<SocketChannel> 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception 
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageDncoder());//解码器
        pipeline.addLast(new MyMessageEncoder());//编码器
        pipeline.addLast(new MyserverHandler());
    

3.3.9  服务端-自定义handler

public class MyserverHandler extends SimpleChannelInboundHandler<InfoProtocol> 
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, InfoProtocol msg) throws Exception 
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println();
        System.out.println("服务器接收到信息如下");
        System.out.println("长度=" + len);
        System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
        System.out.println("服务器接收到消息包数量=" + (++this.count));
        //回复消息
        String responseContent = UUID.randomUUID().toString();
        int responseLen = responseContent.getBytes("utf-8").length;
        byte[]  responseContent2 = responseContent.getBytes("utf-8");
        //构建一个协议包
        InfoProtocol messageProtocol = new InfoProtocol();
        messageProtocol.setLen(responseLen);
        messageProtocol.setContent(responseContent2);
        ctx.writeAndFlush(messageProtocol);
        System.out.println("================================================");
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        //cause.printStackTrace();
        ctx.close();
    


3.3.10  调试

1.服务端

 2.客户端

 

以上是关于网络I/o编程模型21 netty的粘包和拆包问题的解决方案的主要内容,如果未能解决你的问题,请参考以下文章

tcp的粘包和拆包示例以及使用LengthFieldFrameDecoder来解决的方法

解决粘包和拆包问题

服务端NETTY 客户端非NETTY处理粘包和拆包的问题

TCP粘包拆包基本解决方案

TCP粘包和拆包

Netty解决粘包和拆包问题的四种方案