Netty实战五之ByteBuf

Posted Java猫说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty实战五之ByteBuf相关的知识,希望对你有一定的参考价值。

网络数据的基本单位总是字节,Java NIO 提供了ByteBuffer作为它的字节容器,但是其过于复杂且繁琐。

Netty的ByteBuffer替代品是ByteBuf,一个强大的实现,即解决了JDK API的局限性,又为网络应用程序的开发者提供了更好的API。

1、ByteBuf的API

Netty的数据处理API通过两个组件暴露——abstract class ByteBuf 和 interface ByteBufHolder。

以下是其优点:

-可以被用户自定义的缓冲区类型扩展

-通过内置的复合缓冲区类型实现了透明的零拷贝

-容量可以按需增长(类似JDK的StringBuilder)

-在读和写这两个模式之间切换不需要调用ByteBuffer的flip()方法

-读和写使用了不同的索引

-支持方法的链式调用

-支持引用计数

-支持池化

其他类可用于管理ByteBuf实例的分配,以及执行各种针对于数据容器本身和它所持有的数据的操作。

2、ByteBuf如何工作

因为所有的网络通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。

ByteBuf维护了两个不同的索引:一个用于读取、一个用于写入。当你从ByteBuf读取时,它的readerIndex将会被递增已经被读取的字节数。同样地,当你写入BytBuf时,它的writerIndex也会被递增。下图展示了一个空ByteBuf的布局结构和状态。 

如果我们打算读取字节直到readerIndex达到和writeIndex同样的值时会发生什么,则将会到达“可以读取的”数据的末尾。就如同视图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个indexOutOfBoundsException。

名称以read或者write开头的ByteBuf方法,将会推进其对应的索引,而名称以set或者get开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。

可以指定ByteBuf的最大容量。试图移动写索引(即writerIndex)超过这个值将会触发一个异常。(默认的限制是Integer.MAX_VALUE)

3、ByteBuf的使用模式-堆缓冲区

一个由不同的索引分别控制读访问和写访问的字节数组。

最常用的ByteBuf模式是将数据存储在JVM的堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。这种方式,非常适合于有遗留的数据需要处理的情况。

ByteBuf directBuf = ...;
        //检查ByteBuf是否由数组支撑。如果不是,则这是一个直接缓冲区        if (!directBuf.hasArray()){
            //获取可读字节数            int length = directBuf.readableBytes();
            //分配一个新的数组来保存具有该长度的字节数据
            byte[] array = new byte[length];
            //将字节复制到该数组            directBuf.getBytes(directBuf.readerIndex(),array);
            //使用数组、偏移量和长度作为参数调用你的方法            handleArray(array, 0 ,length);
        }

4、ByteBuf的使用模式-直接缓冲区

直接缓冲区是另外一种ByteBuf模式。我们期望用于对象创建的内存分配永远都来自于堆中,但这并不是必须的——NIO在JDK1.4中引入的ByteBuffer类允许JVM实现通过本地调用来分配内存。这主要是为了避免在每次调用本地I/O操作之前(或者之后)将缓存区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。

ByteBuffer的Javadoc明确指出:“直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外”。这也就解释了为何直接缓冲区对于网络数据传输是理想的选择。如果你的数据包含在一个在堆上分配的缓冲区中,那么事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲区复制到一个直接缓冲区中。

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都教委昂贵。如果你正在处理遗留代码,你也可能会遇到另一个缺点:因为数据不是在堆上,所以你不得不进行一次复制。如下代码所示。显然,这比使用支撑数组相比工作量更多。

ByteBuf heapBuf = ...;        //检查ByteBuf是否有一个支撑数组
        //当hasArray()方法返回false时,尝试访问支撑数组将触发一个UnsupportedOperationException
        //这个模式类似于JDK的ByteBuffer的用法
        if (heapBuf.hasArray()){            //如果有,则获取对该数组的引用
            byte[] array = heapBuf.array();            //计算第一个字节的偏移量
            int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();            //获得可读字节数
            int length = heapBuf.readableBytes();            //使用数组、偏移量和长度作为参数调用你的方法
            handleArray(array,offset,length);
        }

5、ByteBuf的使用模式-复合缓冲区

它为多个ByteBuf提供一个聚合视图。在这里你可以根据需要添加或者删除ByteBuf实例,这是一个JDK的ByteBuffer实现完全缺失的特性。

Netty通过一个ByteBuf子类——CompositeByteBuf——实现了这个模式,他提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。

警告:CompositeByteBuf中的ByteBuf实例可能同时包含直接内存分配和非直接内存分配。如果其中只有一个实例,那么对CompositeByteBuf上的hsaArray()方法的调用将返回该组件上的hasArray()方法的值;否则它将返回false。

为了举例说明,让我们考虑一下一个由两个部分——头部和主体——组成的将通过HTTP协议传输的消息。这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序可以选择多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。

因为我们不想为每个消息都重新分配这两个缓冲区,所以使用CompositeByteBuf是一个完美的选择。它在消除了没必要的复制的同时,暴露了通用的ByteBuf API。

Netty实战五之ByteBuf

以下代码展示了如何通过使用JDK的ByteBuffer来实现这一需求。创建一个包含两个ByteBuffer的数组用来保存这些消息组件,同时创建了第三个ByteBuffer用来保存所有这些数据的副本。

//Use an array to hold message parts
        ByteBuffer[] message = new ByteBuffer[]{header,body};
        //Create a new ByteBuffer and use copy to merge the header and body
        ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining());        message2.put(header);        message2.put(body);        message2.flip();

分配和复制操作,以及伴随着数组管理的需要,使得这个版本的实现效率低下而且笨拙。

CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
        ByteBuf headerBuf = ...;//can be backing or direct
        ByteBuf bodyBuf = ...;//can be backing or direct
        //将ByteBuf实例追加到CompositeByteBuf
        messageBuf.addComponents(headerBuf,bodyBuf);
        ......
        //删除位于索引位置为0(第一个组件)的ByteBuf
        messageBuf.removeComponent(0);//remove the header
        //循环遍历所有的ByteBuf实例        for(ByteBuf buf: messageBuf){
            System.out.println(buf.toString());
        }

CompositeByteBuf可能不支持访问其支撑数组,因此访问CompositeByteBuf中的数据类似于(访问)直接缓冲区的模式。

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
        //获得可读字节数        int length = compBuf.readableBytes();
        //分配一个具有可读字节数长度的新数组
        byte[] array = new byte[length];
        //将字节读到该数组中        compBuf.getBytes(compBuf.readerIndex(),array);
        //使用偏移量和长度作为参数使用该数组        handleArray(array,0,array.length);

需要注意的是,Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。这种优化发生在Netty的核心代码中,因此不会被暴露出来,但是你应该知道它带来的影响。

6、字节级操作——随机访问索引

如同在普通的Java字节数组中一样,ByteBuf的索引是从零开始的:第一个字节的索引是0,最后一个字节总是capacity()-1.以下代码表明,对存储机制的封装使得遍历ByteBuf的内容非常简单。

ByteBuf buffer = ...;        for (int i = 0; i < buffer.capacity(); i++){
            byte b = buffer.getByte(i);
            System.out.println((char)b);
        }

需要注意的是,使用那些需要一个索引值参数的方法之一来访问数据既不会改变readerIndex也不会改变weriterIndex。如果有需要,也可以通过调用readerIndex(index)或者writerIndex(index)来手动移动这两者。

7、字节级操作——顺序访问索引

虽然ByteBuf同时具有读索引和写索引,但是JDK的ByteBuffer却只有一个索引,这也就是为什么必须调用flip()方法来在读模式和写模式之间进行切换的原因。下图展示了ByteBuf是如何被它的两个索引划分成3个区域的。 Netty实战五之ByteBuf

8、字节级操作——可丢弃字节

在上图中标记为可丢弃字节的分段包含了已经被读过的字节。通过调用discardReadBytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为0,存储在readerIndex中,会随着read操作的执行而增加(get*操作不会移动readerIndex)。

下图展示了上图所展示的缓冲区上调用discardReadBytes()方法后的结果。可以看到,可丢弃字节分段中的空间已经变为可写的了。注意,在调用discardReadBytes()之后,对可写分段的内容并没有任何的保证。(因为只是移动了可以读取的字节以及writerIndex,而没有对所有可写入的字节进行擦除写。) Netty实战五之ByteBuf

虽然你可能会倾向于频繁地调用discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会导致内存复制,因为可读字节(图中标记为CONTENT的部分)必须被移动到缓冲区的开始位置。我们建议只在真正需要的时候才这样做,例如,当内存非常宝贵的时候。

9、字节级操作——可读字节

ByteBuf的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的readerIndex值为0。任何名称以read或者skip开头的操作都将检索或者跳过位于当前readerIndex的数据,并且将它增加已读字节数。

如果被调用的方法需要一个ByteBuf参数作为写入的目标,并且没有指定目标索引参数,那么该目标缓冲区的writerIndex也将被增加,例如: readBytes(ByteBuf dest);

如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据,那么将会引发一个IndexOutOfBoundsException。

下图展示了如何读取所有可以读的字节。

ByteBuf buffer = ...;        while (buffer.isReadable()){
            System.out.println(buffer.readByte());
        }

10、字节级操作——可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的writeIndex的默认值为0.任何名称以write开头的操作都将从当前的writeIndex处开始写数据,并将它增加已经写入的字节数。如果写操作的目标也是ByteBuf,并且没有指定源索引的值,则源缓冲区的readerIndex也同样会被增加相同的大小。这个调用如下所示:

writeBytes(ByteBuf dest);

如果尝试往目标写入超过目标容量的数据,将会引发一个IndexOutOfBoundException。以下代码是一个用随机整数值填充缓存区,直到它空间不足为止的例子。writeableBytes()方法在这里被用来确定该缓冲区中是否还有足够的空间。

 //Fills the writable bytes of a buffer with random integers
        ByteBuf buffer = ...;        while (buffer.writableBytes() >= 4){
            buffer.writeInt(random.nextInt());
        }

11、索引管理

JDK的InputStream定义了mark(int readlimit)和reset()方法,这些方法分别被用来将流中的当前位置标记为指定的值,以及将流重置到该位置。

同样,可以通过调用markReaderIndex()、markWriterindex()、resetWriterIndex()和resetReaderIndex()来标记和重置ByteBuf的readerIndex和writerIndex。这些和InputStream上的调用类似,只是没有readlimit参数来指定标记什么时候失效。

也可以通过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个IndexOutOfBoundsException。

可以通过调用clear()方法来将readerIndex和writerIndex都设置为0.注意,这并不会清除内存中的内容。 

和之前一样,ByteBuf包含3个分段,下图展示了在clear()方法被调用之后ByteBuf的状态。 

调用clear()比调用discardReadBytes()轻量得多,因为它将只是重置索引而不会复制任何的内存。

12、查找操作

在ByteBuf中有多种可以用来确定指定值的索引的方法,最简单的是使用indexOf()方法。较复杂的查找可以通过那些需要一个ByteBufProcessor作为参数的方法达成。这个接口只定义了一个方法:

boolean process(byte value)

它将检查输入值是否是正在查找的值

ByteBufProcessor针对一些常见的值定义了许多遍历的方法。假设你的应用程序需要和所谓的包含有以NULL结尾的内容的FLash套接字集成。调用、 forEachByte(ByteBufProcessor.FIND_NUL) 将简单高效地消费该Flash数据,因为在处理期间只会执行较少的边界检查。 以下代码展示了一个查找回车符(\r)的例子

ByteBuf buffer = ...;
       int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

13、派生缓冲区

派生缓冲区为ByteBuf提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

·duplicate()

·slice()

·slice(int,int)

·Unpooled.unmodifiableBuffer(…)

·order(ByteOrder)

·readSlice(int)

每个这些方法都将返回一个新的ByteBuf实例,它具有自己的读索引、写索引和标记索引。其内部存储和JDK的ByteBuffer一样也是共享的。这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心。

ByteBuf复制 : 如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int,int)方法,不同于派生缓冲区,由这个调用所返回的ByteBuf拥有独立的数据副本。

以下代码展示了如何使用slice(int,int)方法来操作ByteBuf的一个分段

Charset utf8 = Charset.forName("UTF-8");        //创建一个用于保存给定字符串的字节的ByteBuf
       ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8);       //创建该ByteBuf从索引0开始到索引15结束的一个新切片
        ByteBuf sliced = buf.slice(0,15);        //打印内容
        System.out.println(sliced.toString(utf8));        //更新索引0处的字节
        buf.setByte(0,(byte)'J');        //将会成功,因为数据是共享的,对其中一个所做的更改对另一个也是可见的
        assert buf.getByte(0) == sliced.getByte(0);

以下让我们看看,ByteBuf的分段的副本和切片有何区别

Charset utf8 = Charset.forName("UTF-8");       //创建ByteBuf以保存所提供的字符串的字节
       ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8);       //创建该ByteBuf从索引0开始到索引15结束的分段的副本
       ByteBuf copy = buf.copy(0,15);       //打印内容
        System.out.println(copy.toString(utf8));        //更新索引0处的字节
        buf.setByte(0,(byte)'J');        //将会成功,因为数据不是共享的
        assert buf.getByte(0) != copy.getByte(0);

除了修改原始ByteBuf的切片或者副本的效果以外,这两种场景是相同的。只要有可能,使用slice()方法来避免复制内存的开销。

14、读/写操作

get()和set()操作,从给定的索引开始,并且保持索引不变

read()和write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。 以下代码说明了其用法,表明了他们不会改变读索引和写索引。

Charset utf8 = Charset.forName("UTF-8");       //创建一个新的ByteBuf以保存给定字符串的字节
       ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8);       //打印第一个字符‘N’
        System.out.println((char)buf.getByte(0));        //存储当前的readIndex和writeIndex
        int readerIndex = buf.readerIndex();        int writeIndex = buf.writerIndex();        //将索引0处的字节更新为字符‘B’
        buf.setByte(0,(byte)'B');        //打印第一个字符,现在是‘B’
        System.out.println((char)buf.getByte(0));        //将会成功,因为这些操作并不会修改相应的索引
        assert readerIndex == buf.readerIndex();        assert writeIndex == buf.writerIndex();

还有read()操作,其作用于当前的readerIndex或writeIndex。这些方法将用于从ByteBuf中读取数据。如同它是一个流。

几乎每个read()方法都有对应的write()方法,用于将数据追加到ByteBuf中,以下代码展示了read()和write()操作

Charset utf8 = Charset.forName("UTF-8");        //创建一个新的ByteBuf以保存给定字符串的字节
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!",utf8);        //打印字符‘N’
        System.out.println((char)buf.readByte());        //存储当前的readerIndex
        int readerIndex = buf.readerIndex();        //存储当前的writeIndex
        int writeIdnex = buf.writerIndex();        //将字符‘?’追加到缓冲区
        buf.writeByte((byte)'?');        assert readerIndex == buf.readerIndex();        //将会成功,因为writeByte()方法移动了writeIndex
        assert writeIdnex != buf.writerIndex();

15、ByteBufHolder接口

我们经常发现,除了实际的数据负载之外,我们还需要存储各种属性值。HTTP响应便是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie等。

为了处理这种常见的用例,Netty提供了ByteBufHolder。ByteBufHolder也为Netty的高级特性提供了支持,如缓冲区池化,其中可以从池中借用ByteBuf,并且在需要时自动释放。

ByteBufHolder只有几种用于访问底层数据和引用计数的方法。

如果想要实现一个将其有效负载存储在ByteBuf中的消息对象,那么ByteBufHolder将是个不错的选择。

16、按需分配:ByteBufAllocator接口

为了降低分配和释放内存的开销,Netty通过interface ByteBufAllocator实现了(ByteBuf的)池化,它可以用来分配我们所描述过的任何类型的ByteBuf实例。使用池化是特定于应用程序的决定,其并不会以任何方式改变ByteBuf API。

可以通过Channel(每个都可以有一个不同的ByteBufAllocator实例)或者绑定到ChannelHandler的ChannelHanlderContext获取一个到ByteBufAllocator的引用。

io.netty.channel.Channel channel = ...;
        ByteBufAllocator allocator = channel.alloc();
        .....
        ChannelHandlerContext ctx = ...;
        ByteBufAllocator allocator1 = ctx.alloc();
        ....

Netty提供了两种ByteBufAllocator的实现:PooledByteAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。此实现使用了一种称为jemalloc的已被大量现代操作系统所采用的高效方法来分配内存。后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。

虽然Netty默认使用了PooledByteBufAllocator,但这可以很容易地通过ChannelConfig API或者在引导你的应用程序时指定一个不同的分配器来更改。

17、Unpooled缓冲区

可能某些情况下,你未能获取一个到ByteBufAllocator调用,对于这种情况,Netty提供了一个简单的成为Unpooled的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

Unpooled类还使得ByteBuf同样可用于那些并不需要Netty的其它组件的非网络项目,使得其能得益于高性能的可扩展的缓冲区API。

18、ByteBufUtil类

ByteBufUtil提供了用于操作ByteBuf的静态的辅助方法。因为这个API是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。

这些静态方法中最有价值的可能是hexdump()方法,它以十六进制的表示形式打印ByteBuf的内容。这在各种情况下都很有用,例如,出于调试的目的记录ByteBuf的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。

另一个有用的方法是boolean equals(ByteBuf,ByteBuf),它被用来判断两个ByteBuf实例的相等性,如果你实现了自己的ByteBuf子类,你可能会发现ByteBufUtil的其它有用方法。

19、引用计数

引用计数是一种通过在某个对象所持有的资源不再被其它对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。

引用技术背后的想法并不是特别的复杂,他主要设计跟踪到某个特定对象的活动引用的数量。一个ReferenceCounted实现的实例将通常以活动的引用计数为1作为开始。只要引用计数大于0,就能保证对象不会被释放。当活动引用的数量减少到0时,该实例就会被释放。注意,虽然释放的确切语义可能是特定于实现的,但是至少已经释放的对象应该不可再用了。

引用技术对于池化实现(PooledByteBufAllocator)来说是至关重要的,它降低了内存分配的开销。

io.netty.channel.Channel channel = ...;
        //从Channel获取ByteBufAllocator
        ByteBufAllocator allocator = channel.alloc();        ...
        //从ByteBufAllocator分配一个ByteBuf
        ByteBuf buffer = allocator.directBuffer();
        //检查引用技术是否为预期的1
        assert buffer.refCnt() == 1;
ByteBuf buffer = ...;
        //减少到该对象的活动引用。当减少到0时,该对象被释放,并且该方法返回true
        boolean released = buffer.release();

试图访问一个已经被释放的引用计数对象,将会导致一个IllegalReferenceCountException。 注意,一个特定的(ReferenceCounted的实现)类,可以用它自己的独特方式来定义它的引用计数规则。例如,我们可以设想一个类,其release()方法的实现总是将引用计数设为零,而不用关心它的当前值,从而一次性地使所有的活动都失效。

谁负责释放 : 一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。


以上是关于Netty实战五之ByteBuf的主要内容,如果未能解决你的问题,请参考以下文章

Netty入门——ByteBuf

Netty 框架中ByteBuf转String

「Netty实战 03」大白话 Netty 核心组件分析

Netty ByteBuf 占用所有内存

Netty_03_ByteBuf和网络中拆包粘包问题及其解决

Netty_03_ByteBuf和网络中拆包粘包问题及其解决