Netty中的字节操作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty中的字节操作相关的知识,希望对你有一定的参考价值。

参考技术A

Netty 作为一个优秀网络框架,其高效的 内存操作 也是使其变得 高性能 的很重要原因之一。

众所周知, Java 的 NIO 中提供了类 ByteBuffer 作为字节的容器,但是操作非常的复杂,Netty针对 ByteBuffer 设计了一个替代类 ByteBuf ,方便开发者操作字节。

对于任意一个 ByteBuf 对象,都拥有三个非常重要的属性:

ByteBuf 对象每读取一个byte的数据,readerIndex就会往前推进,直到readerIndex到达capacity的值,所有的数据的数据都被读取完, ByteBuf 不可再被读取。可以通过 readableBytes() 方法获取 readerIndex 的值。

相同地,writerIndex记录了 ByteBuf 对象使用了多少数据,可以通过 writableBytes() 方法获取writerIndex的值。每当 ByteBuf 被写入了多少数据,writerIndex就会往前推进,直到值到达capacity的值, ByteBuf 会自动对空间进行扩容。

对于任意一个 ByteBuf 对象,我们都可以根据它的索引通过 getByte() 方法随机访问中间的数据。随机访问不会改变 readerIndex 的值。

通过 array() 方法可以直接获取, ByteBuf 中的Byte数组信息。

Netty的“Zero-Copy”设计非常出名,这主要就是依赖了Netty中 ByteBuf 的设计。 ByteBuf 主要有以下几种模式:

顾名思义,这个模式下的字节是在Jvm的堆区操作的,也是最常见的内存操作了。

在JDK1.4中,Java引入了一种 直接内存 ,NIO可以通过 本地方法 分配一些堆外的直接内存,这块内存区不受Jvm的控制,理论上的无限的。

对于网络Socket通信来说,这种内存区域的好处是Java在通信中,数据不必从Jvm中拷贝一份到系统的直接内存区上,操作系统的Socket接口可以直接处理这份在直接内存的数据。同时由于数据在堆外,也避免了频繁GC对这块区域的影响。

ByteBuf 提供了 Direct Buffer 模式,我们可以直接通过 ByteBuf 操作 直接内存

Direct Buffer 模式下,由于数据不在堆上面, ByteBuf 是不可以直接使用 array() 方法获取数据的。

在TCP协议中,一份完整的数据总是被拆成好几个包被发送或者接收,一般情况下,程序会通过内存拷贝的方式将一组数据拷贝到一个大的数组中,形成一份完整的数据。

而Composite buffer模式可以聚合多个ByteBuffer对象,将这组数据的引用收集到一个 ByteBuf 对象中, 避免 了数据的拷贝。

当然为了避免Netty本身内存使用过度,Netty内部对所有的内存做了池化。通过 ByteBufAllocator 类,我们可以分配一块被池化的内存,从而减少分配和释放内存的开销。

如果我们希望使用一块新的内存,或者对一个已经存在的内存进行包装,那么我们可以使用 Unpooled 类来分配内存:

Netty中的ChannelHandler的基础知识

Netty ChannelHandler概览     

Netty中的ChannelHandler类似于工程对产品流水线生产中:生产线上的每一步的处理器,而生产线则就是Netty中的ChannelPipeline。Netty也正是通过ChannelHandler实现了业务与底层网络的解耦。Netty中的ChannelHandler按照输出字节流向分为In,Out,Duplex三种ChannelHandler,本文主要介绍前面两种,Netty中ChannelHandler关系图如下:

  • ChannelHander是顶级抽象,基础接口类。
  • ChannelHandlerAdapter则是ChannelHandler的一个适配器,对ChannelHandler添加了新的行为,用于判断当前Handler是否执行共享,ChannelHandler共享是指:如果一个ChannelHandler支持共享,则该ChannelHandler可以被添加到多个ChannelPipeline中。
  • ChannelInboundHandler、ChannelOutboundHandler则是分别对应进站与出站的ChannelHandler处理器。
  • ChannelInboundHandlerAdpater、ChannelOutboundHandlerAdapter分别实现了对应类型的ChannelHandler之外,还实现了ChannelHandlerAdpater接口,这样就有了判断是否是共享ChannelHandler的行为。

ChannelInboundHandler

Netty中无论对应客户端还是服务器端都有读、写操作,读操作对应的就是进站操作,进站的数据都会经过ChannelInboundHandler处理,例如解码器都是ChannelInboundHandler的子类实现;本文简单介绍一下SimpleChannelInboundHandler。

SimpleChannelInboundHandler是ChannelInboundHandlerAdpater子类实现,SimpleChannelInboundHandler是一个类型敏感的处理器,在明确上游消息类型时候可以使用该处理器,这样就避免了类型的强转;在SimpleChannelInboundHandler中新增加两个方法分别是:acceptInboundMessage与channelRead0方法,同时重写了channelRead方法,在channelRead方法中进行处理类型转换,对于下游事件处理通过重写channelRead0即可,具体源码如下:


    /**
     * Returns @code true if the given message should be handled. If @code false it will be passed to the next
     * @link ChannelInboundHandler in the @link ChannelPipeline.
     */
    public boolean acceptInboundMessage(Object msg) throws Exception 
        return matcher.match(msg);
    

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        boolean release = true;
        try 
            if (acceptInboundMessage(msg)) 
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
             else 
                release = false;
                ctx.fireChannelRead(msg);
            
         finally 
            if (autoRelease && release) 
                ReferenceCountUtil.release(msg);
            
        
    

    /**
     * <strong>Please keep in mind that this method will be renamed to
     * @code messageReceived(ChannelHandlerContext, I) in 5.0.</strong>
     *
     * Is called for each message of type @link I.
     *
     * @param ctx           the @link ChannelHandlerContext which this @link SimpleChannelInboundHandler
     *                      belongs to
     * @param msg           the message to handle
     * @throws Exception    is thrown if an error occurred
     */
    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

备注:ChannelInitializer也是一个入站处理器,当Channel建立时候进行一些初始化工作,例如Pipeline上面添加Handler操作。

ChannelOutboundHandler

Netty中Channel上的写操作对应着出站操作,通常流经ChannelOutboundHandler处理之后通过网络将数据发出去;ChannelOutboundHandler的子类实现一般都是编码器,对应编码操作。

MessageToByteEncoder是一个类型敏感的编码器,将指定类型的Message序列化为二进制数据;该类新增三个方法分别是:acceptOutboundMessage(用于判断类型),allocateBuffer(分配缓冲区),encode(编码逻辑供子类具体实现);而write方法中进行类型的处理,判断是否类型匹配,匹配则进行encode处理。


    /**
     * Returns @code true if the given message should be handled. If @code false it will be passed to the next
     * @link ChannelOutboundHandler in the @link ChannelPipeline.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception 
        return matcher.match(msg);
    

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
        ByteBuf buf = null;
        try 
            if (acceptOutboundMessage(msg)) 
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try 
                    encode(ctx, cast, buf);
                 finally 
                    ReferenceCountUtil.release(cast);
                

                if (buf.isReadable()) 
                    ctx.write(buf, promise);
                 else 
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                
                buf = null;
             else 
                ctx.write(msg, promise);
            
         catch (EncoderException e) 
            throw e;
         catch (Throwable e) 
            throw new EncoderException(e);
         finally 
            if (buf != null) 
                buf.release();
            
        
    

    /**
     * Allocate a @link ByteBuf which will be used as argument of @link #encode(ChannelHandlerContext, I, ByteBuf).
     * Sub-classes may override this method to return @link ByteBuf with a perfect matching @code initialCapacity.
     */
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception 
        if (preferDirect) 
            //Direct Buffer
            return ctx.alloc().ioBuffer();
         else 
            //Heap Buffer
            return ctx.alloc().heapBuffer();
        
    

    /**
     * Encode a message into a @link ByteBuf. This method will be called for each written message that can be handled
     * by this encoder.
     *
     * @param ctx           the @link ChannelHandlerContext which this @link MessageToByteEncoder belongs to
     * @param msg           the message to encode
     * @param out           the @link ByteBuf into which the encoded message will be written
     * @throws Exception    is thrown if an error occurs
     */
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

ChannelDuplexHandler

经过如上的进站、出站ChannelHandler的理解基础之上,对于ChannelDuplexHandler理解就比较容易了,ChannelDuplexHandler是一个双工的处理器,既可以处理入站信息,也可以处理出站信息,通常对应的也就是编解码器(Codec-编解码器)。

Netty中的MessageToMessageCodec就是一个典型的编解码器,该编解码器新增加了4个方法,分别进站对应两个,出站对应两个,对于encode与decode由子类具体实现响应的编码与解码。如下是一个简单的Long与Integer类型之间编码解码的实现:

public class NumberCodec extends MessageToMessageCodec<Integer, Long> 
    @Override
    public Long decode(ChannelHandlerContext ctx, Integer msg, List<Object> out)
            throws Exception 
        out.add(msg.longValue());
    

    @Override
    public Integer encode(ChannelHandlerContext ctx, Long msg, List<Object> out)
            throws Exception 
        out.add(msg.intValue());
    

 

以上是关于Netty中的字节操作的主要内容,如果未能解决你的问题,请参考以下文章

Netty编解码框架分析

浅析操作系统和Netty中的零拷贝机制

java编解码技术,netty nio

「Netty实战 03」大白话 Netty 核心组件分析

Netty零拷贝

从0到1 ▏Netty编解码框架之多种常用解码器使用示例解析