NettyNetty编解码&粘包拆包&心跳机制&断线自动重连

Posted Cry丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NettyNetty编解码&粘包拆包&心跳机制&断线自动重连相关的知识,希望对你有一定的参考价值。

Netty编解码

Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用。

ChannelHandler

ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。

ChannelPipeline

ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的,入站只调用pipeline里的ChannelInboundHandler逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。

编码解码器

当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节

Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。

如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。

protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:

引入依赖:

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-api</artifactId>
    <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.10</version>
</dependency>
<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.10</version>
</dependency>

protostuff使用示例:

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * protostuff 序列化工具类,基于protobuf封装
 */
public class ProtostuffUtil 

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();

    private static <T> Schema<T> getSchema(Class<T> clazz) 
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
        if (schema == null) 
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) 
                cachedSchema.put(clazz, schema);
            
        
        return schema;
    

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) 
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try 
            Schema<T> schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
         catch (Exception e) 
            throw new IllegalStateException(e.getMessage(), e);
         finally 
            buffer.clear();
        
    

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) 
        try 
            T obj = clazz.newInstance();
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
         catch (Exception e) 
            throw new IllegalStateException(e.getMessage(), e);
        
    

    public static void main(String[] args) 
        byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge"));
        User user = ProtostuffUtil.deserializer(userBytes, User.class);
        System.out.println(user);
    

在发送端NettyClientHandler调用编码

public class NettyClientHandler extends ChannelInboundHandlerAdapter 

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("收到服务器消息:" + msg);
    

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        System.out.println("MyClientHandler发送数据");
        //ctx.writeAndFlush("测试String编解码");
        //测试对象编解码
        //ctx.writeAndFlush(new User(1,"zhuge"));
        //测试用protostuff对对象编解码
        ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "cry777")));
        ctx.writeAndFlush(buf);

    

在接收端NettyServerHandler调用解码

public class NettyServerHandler extends ChannelInboundHandlerAdapter 

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        //System.out.println("从客户端读取到String:" + msg.toString());
        //System.out.println("从客户端读取到Object:" + ((User)msg).toString());
         //测试用protostuff对对象编解码
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        System.out.println("从客户端读取到Object:" + ProtostuffUtil.deserializer(bytes, User.class));
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

Netty粘包拆包

TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。

解决方案

1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格

2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。

3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。

Netty提供了多个解码器,可以进行分包的操作,如下:
LineBasedFrameDecoder (回车换行分包)
DelimiterBasedFrameDecoder(特殊分隔符分包)
FixedLengthFrameDecoder(固定长度报文来分包)

//加入特殊分隔符分包解码器
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("_".getBytes())));

自定义协议包(例如约定包长度):

/**
 * 自定义协议包
 */
public class MyMessageProtocol 

    //定义一次发送包体长度
    private int len;
    //一次发送包体内容
    private byte[] content;

    public int getLen() 
        return len;
    

    public void setLen(int len) 
        this.len = len;
    

    public byte[] getContent() 
        return content;
    

    public void setContent(byte[] content) 
        this.content = content;
    

自定义编码器

public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> 
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception 
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    

自定义解码器

public class MyMessageDecoder extends ByteToMessageDecoder 

    int length = 0;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception 
        System.out.println();
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)
        System.out.println(in);

        if(in.readableBytes() >= 4) 
            if (length == 0)
                length = in.readInt();
            
            if (in.readableBytes() < length) 
                System.out.println("当前可读数据不够,继续等待。。");
                return;
            
            byte[] content = new byte[length];
            if (in.readableBytes() >= length)
                in.readBytes(content);

                //封装成MyMessageProtocol对象,传递到下一个handler业务处理
                MyMessageProtocol messageProtocol = new MyMessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                out.add(messageProtocol);
            
            length = 0;
        
    

ChannelInboundHandler

public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> 

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        for(int i = 0; i< 2; i++) 
            String msg = "你好,我是张三!";
            //创建协议包对象
            MyMessageProtocol messageProtocol = new MyMessageProtocol();
            messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
            messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
            ctx.writeAndFlush(messageProtocol);
        
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception 

    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    


Client

public class MyClient 
    public static void main(String[] args)  throws  Exception

        EventLoopGroup group = new NioEventLoopGroup();

        try 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NiosocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception 
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyMessageEncoder());
                            pipeline.addLast(new MyClientHandler());
                        
                    );

            System.out.println("netty client start。。");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        finally 
            group.shutdownGracefully();
        
    

ChannelInboundHandler

public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> 

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception 
        System.out.println("====服务端接收到消息如下====");
        System.out.println("长度=" + msg.getLen());
        System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));

        System.out.println("服务端接收到消息包数量=" + (++this.count));
    

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        cause.printStackTrace();
        ctx.close();
    

Server

public class MyServer 
    public static void main(String[] args) throws Exception 

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            ServerBootstrap serverBootstrap = new ServerBootstrap()以上是关于NettyNetty编解码&粘包拆包&心跳机制&断线自动重连的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包)

Netty之启动类编解码器等源码解析及粘包拆包问题

Netty框架之编解码机制二(自定义协议)

Netty框架之编解码机制二(自定义协议)

Netty框架之编解码机制二(自定义协议)

netty之粘包拆包ByteToMessageDecoder