Netty解码器

Posted 落叶飞翔的蜗牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty解码器相关的知识,希望对你有一定的参考价值。

P art1




1
Netty——分隔符和定长解码器

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用一下4种方式:

1.消息长度固定。累计读取到指定长度的消息后就认为读取了一个完整的消息;将计数器置位,重新开始读取下一个数据报

2.将回车符作为消息结束符。如FTP协议

3.将特殊的分隔符作为消息结束的标志。换行符就是一种特殊的结束分隔符

4.通过在消息头中长度字段来表示消息的总长度

Netty对上述4种方式提供了统一的抽象,提供4种解码器来解决对应的问。


2
  解码器介绍


DelimiterBasedFrameDecoder:自动完成以分隔符作为标识符的消息接码

FixedLengthFrameDecoder:自动完成对定长消息的接码

 
P art2


1
DelimiterBasedFrameDecoder客户端
public class DelimiterBasedFrameDecoderEchoClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NiosocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1");
    }
}

2
  DelimiterBasedFrameDecoder客户端处理类
public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
    private AtomicInteger count = new AtomicInteger(0);
    private byte[] req;

    public DelimiterBasedFrameDecoderEchoClientHandler() {
        req = ("hello world" + "$_").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}



3
  DelimiterBasedFrameDecoder服务器端
public class DelimiterBasedFrameDecoderEchoServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            //DelimiterBasedFrameDecoder解码器 $_ 作为分隔符
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            //StringDecoder解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new DelimiterBasedFrameDecoderEchoServer().bind(port);
    }
}

4
  DelimiterBasedFrameDecoder服务器端处理类
public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {

    private AtomicInteger count = new AtomicInteger(0);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String body = (String) msg;
        System.out.println("服务器端第" + count.incrementAndGet() + "次收到消息:" + body);
        ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + "$_").getBytes());
        //并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}

   
5
  DelimiterBasedFrameDecoder 执行结果
服务器端第1次收到消息:hello world
服务器端第2次收到消息:hello world
服务器端第3次收到消息:hello world
服务器端第4次收到消息:hello world
服务器端第5次收到消息:hello world
服务器端第6次收到消息:hello world
服务器端第7次收到消息:hello world
服务器端第8次收到消息:hello world
服务器端第9次收到消息:hello world
服务器端第10次收到消息:hello world
······
服务器端第99次收到消息:hello world
服务器端第100次收到消息:hello world

客户端第1次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第2次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第3次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第4次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第5次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第6次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第7次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第8次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第9次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第10次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
·····
客户端第98次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第99次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第100次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
P art3



 
1
  FixedLengthFrameDecoder客户端
public class FixedLengthFrameDecoderEchoClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1");
    }
}


   
2
  FixedLengthFrameDecoder客户端处理类
public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
    private AtomicInteger count = new AtomicInteger(0);
    private byte[] req;

    public FixedLengthFrameDecoderEchoClientHandler() {
        req = ("hello world").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
3
  FixedLengthFrameDecoder服务器端
public class FixedLengthFrameDecoderEchoServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //FixedLengthFrameDecoder解码器
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11));
                            //StringDecoder解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new FixedLengthFrameDecoderEchoServer().bind(port);
    }
}
4
  FixedLengthFrameDecoder服务器端处理类
public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String body = (String) msg;
        System.out.println("服务器端收到消息:" + body);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}
5
  FixedLengthFrameDecoder测试结果
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
······




E ND


文章来自:狂奔的蜗牛




以上是关于Netty解码器的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之编解码机制二(自定义协议)

Netty框架之编解码机制二(自定义协议)

Netty框架之编解码机制二(自定义协议)

在 Android 上添加 spdy 编解码器后使用 netty 连接失败

10.netty客户端与服务器使用protobuf传输报文

带有 JSON 的 JBoss Netty