Netty框架之编解码机制二(自定义协议)

Posted 木兮君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之编解码机制二(自定义协议)相关的知识,希望对你有一定的参考价值。

前言

自上篇文章Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包),小编将继续讲解netty中的编解码,以及tcp拆包粘包的解决方案代码实践,希望对大家理解有所帮助。好了话不多说进入正题。

拆包粘包的解决方案代码实践

上篇文章分享了一系列解决粘包拆包的方案,下面用代码来编写一些。

固定长度
换行
自定义分割符号

public class PacketSplicingTest 
    private ServerBootstrap serverBootstrap;

    @Before
    public void initSocketServer() 
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(5));
        serverBootstrap.channel(NioserverSocketChannel.class);
    

    @Test
    public void splicingTest() throws InterruptedException 

        serverBootstrap.childHandler(new ChannelInitializer<Channel>() 
            @Override
            protected void initChannel(Channel ch) throws Exception 
            	//使用固定长度 如果没有到4的长度则下面不会打印
                //ch.pipeline().addLast(new FixedLengthFrameDecoder(4));
				//换行分割,并且一行最大的长度为10
				//ch.pipeline().addLast(new LineBasedFrameDecoder(10));
				//特殊符号分割,下面示例为以#分割,最大读长度为10的字符串,中间true为自动会将#丢弃(即丢弃分隔符)
				ByteBuf byteBuf = Unpooled.wrappedBuffer(new byte[]'#');
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10,true,byteBuf));
                ch.pipeline().addLast(new TrackHandler());
            
        );
        ChannelFuture sync = serverBootstrap.bind(8080).sync();
        sync.channel().closeFuture().sync();

    

    private class TrackHandler extends SimpleChannelInboundHandler 
        int count = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
            ByteBuf byteBuf = (ByteBuf) msg;
            String message = byteBuf.toString(Charset.defaultCharset());
            System.out.println(String.format("message%s:%s", ++count, message));
        
    

看完上述比较简单的应用后,小编带大家编写一个自定义协议简单示例。

自定义协议

首先咱们来自定义一个协议的报文,以及编解码实现流程


是不是非常简单,那么现在小编来开始愉快的编写源码。
首先是编解码器

public class ProtocolCodes extends ByteToMessageCodec<String> 
    //标识符
    private static int MAGIC = 0xDDDD;
    //标识符 的bytebuf
    private static ByteBuf MAGIC_BUF = Unpooled.copyInt(MAGIC);

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String msg, ByteBuf byteBuf) throws Exception 
        byte[] bytes = msg.getBytes();
        byteBuf.writeInt(MAGIC);
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
    

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception 
        //是否有标识符
        int indexOfMagic = indexOf(byteBuf, MAGIC_BUF);
        if (indexOfMagic < 0) 
            return; //需要更多的字节
        
        //消息头的长度
        if (!byteBuf.isReadable(indexOfMagic + 8)) 
            return;//需要更多的字节
        
        //读取消息头中消息长度
        int length = byteBuf.slice(indexOfMagic + 4, 4).readInt();
        //是否可读完整的消息体
        if (!byteBuf.isReadable(indexOfMagic + 8 + length)) 
            return;//需要更多的字节
        
        //跳过消息头
        byteBuf.skipBytes(indexOfMagic + 8);
        //读取消息体
        ByteBuf buf = byteBuf.readRetainedSlice(length);
        String message = buf.toString(Charset.defaultCharset());
        list.add(message);
    
    //netty 内部工具类
    private static int indexOf(ByteBuf haystack, ByteBuf needle) 
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) 
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) 
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) 
                    break;
                 else 
                    haystackIndex++;
                    if (haystackIndex == haystack.writerIndex() &&
                            needleIndex != needle.capacity() - 1) 
                        return -1;
                    
                
            

            if (needleIndex == needle.capacity()) 
                // Found the needle from the haystack!
                return i - haystack.readerIndex();
            
        
        return -1;
    


客户端

public class ProtocolClient 

    private Channel channel;


    private void start() throws InterruptedException 
        // ServerBootstrap
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(1));
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<Channel>() 
            @Override
            protected void initChannel(Channel ch) 
                ch.pipeline().addLast(new ProtocolCodes());//出站处理
            
        );
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
        channel = future.sync().channel();

    


    public static void main(String[] args) throws Exception 
        ProtocolClient client = new ProtocolClient();
        client.start();
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        while (true) 
            String line = reader.readLine();
            client.channel.writeAndFlush(line);
        
    

服务端

public class ProtocolServer 

    private ServerBootstrap bootstrap;

    @Before
    public void init() 
        bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(8));
        bootstrap.channel(NioServerSocketChannel.class);

    

    @After
    public void start() throws InterruptedException 
        ChannelFuture future = bootstrap.bind(8080);
        System.out.println("启动成功");
        future.sync().channel().closeFuture().sync();
    

    @Test
    public void test() 
        bootstrap.childHandler(new ChannelInitializer<Channel>() 
            @Override
            protected void initChannel(Channel ch) 
                ch.pipeline().addLast(new ProtocolCodes());
                ch.pipeline().addLast(new TrackHandler());
            
        ) ;
    
    private static class TrackHandler extends SimpleChannelInboundHandler<String> 
        int i = 0;
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) 
            System.out.println(String.format("消息%s:%s", i++, msg));
            ctx.writeAndFlush("返回消息");
        
    

测试结果:


成功编解码,这下发长度不一致的都可以了。

总结

今天主要是使用了netty编写了自定义协议是如何编解码的,当然小编自定义了一个简单的自定义协议,没有像http或dubbo等协议那么复杂,不过即使这样,大家其实也应该明白咱们的协议底层是如何编解码了吧。
看到这儿大家会不会觉得,网络编程的小伙伴确实会比写业务编程的小伙伴拿的工资高,这是有原因的。愿小编再接再厉,将底层的网络传输原理以及一些常用的协议研究一番,知道其架构的思想并掌握。期待小编后续的netty应用吧。

以上是关于Netty框架之编解码机制二(自定义协议)的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之编解码机制二(自定义协议)

Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包)

Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包)

Netty之编解码

netty实战-自定义解码器处理半包消息

Netty之启动类编解码器等源码解析及粘包拆包问题