Netty之ByteBuf原理解析及应用
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty之ByteBuf原理解析及应用相关的知识,希望对你有一定的参考价值。
前言
本篇文章从源码去解析Netty中ByteBuf对Nio中ByteBuf进行对比,做了那些提升并优化。在使用netty框架时,有那些优点;解析ByteBuf如何做到动态扩容得。如何选择合适得ByteBuf ,以及Netty很大得一个特性零拷贝机制都会在这篇博客中进行解析
ByteBuf概述
ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。
JDK Nio ButeBuffer 的缺点:
-
无法动态扩容
- API 使用复杂
读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些api,否则很容出现错误
相对JDK Nio ButeBuffer 做的增强
- API操作便捷性
- 动态扩容
- 多种 ByteBuf的实现 例如堆外内存的实现
- 高效的零拷贝机制
ByteBuf 特性
ByteBuf和Nio 中ByteBuffer一致 也提供了三个重要属性:
- 随机访问索引 getByte
-
顺序读 read*
-
顺序写 write*
-
清除已读内容 discardReadBytes
-
清除缓冲区 clear
-
搜索操作
-
标记和重置
-
引用计数和释放
在ByteBuf中所有方法
当读取操作时,会将reader index的位置进行修改。
在源码中显示
它和Nio中ByteBuffer进行比较 指针含义是不一样的,采用reader index 和writerIndex 的方式更易于操作
源码中显示是在AbstractByteBuf 中做的一个展示
ByteBuf简单应用
- 申请容量为10的ByteBuf
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为====================>" + buf.toString());
System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\\n");
- 如果容量不规定,netty 的ByteBuf是会为申请一个 256容量大小的
ByteBuf buf = Unpooled.buffer();
System.out.println("原始ByteBuf为====================>" + buf.toString());
System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\\n");
- 容量是来自于AbstractByteBufAllocator 中设置的默认属性
- 包括写入和读取都不用 写filp 和close方法去调整
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\\n");
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\\n");
- 包括下面的 丢弃读取内容, 但是丢弃内容的原理是把后面的数据往前面移动。
// 3.读取一段内容
byte b1 = buf.readByte();
// 4.将读取的内容丢弃
buf.discardReadBytes();
// 5.清空读写指针
buf.clear();
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
ByteBuf 动态扩容
在源码中 writeBytes 方法写入进去时,就会走到 AbstractByteBuf 中
- ensureWritable 方法中会去检查容量大小 并进行扩容
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
checkPositiveOrZero(minWritableBytes, "minWritableBytes"); //校验是否为0
ensureWritable0(minWritableBytes); // 确保可以写
return this;
}
// 容量校验
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
if (checkBounds) {
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
//将当前容量正常化为2的幂。
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 调整到新的容量。
capacity(newCapacity);
}
- 在calculateNewCapacity中计算新的容量 这里对于新容量扩容判断 最终的容量 一定是2的幂次方,这有点像hashmap中容量规整 以4m为临界点
//计算新容量
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) { //写入容量大于最大容量报错
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page 阈值
if (minNewCapacity == threshold) { //如果写入容量等于4mib就返回
return threshold;
}
// 如果超过阈值,不要加倍,只需增加阈值即可。
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold; //新容量等于 新容量的最小要求/阈值*阈值 使得新的容量等于4mb的倍数
if (newCapacity > maxCapacity - threshold) {//接近最大值,离最大值不足4mb时,新容量就是最大值
newCapacity = maxCapacity;
} else { // 否则 新容量等于 新容量的最小要求/阈值*阈值
newCapacity += threshold;
}
return newCapacity;
}
// 不超过临界值。从64开始,最多加倍4个MiB。
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
而这个阈值的来源是AbstractByteBufAllocator 类中 CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
ByteBuf 实现
ByteBuf有3个维度的划分方式,8种具体的实现
在使用过程都是通过ByteBufAllocator 分配器进行申请,同时分配器具有内存管理的功能
在nio中的ByteBuffer 是对数组的封装;而netty中的ByteBuf 也相当于一个数组 ,会在上面加很多包括池化 动态扩容等等,都是方便管理和使用商业api
从名字去看 pool 开头的为池化的 ByteBuf ,而unpool则是非池化的ByteBuf;
池化和非池化的对比和区别:
- 池化(pool ):内存使用过后并不释放,存放在池子中,如果还要使用,在去找到原来的的内存,以减少创建和释放的开销。来自于内存复用机制
- 非池化(unpool):内存使用过后就释放,如果在使用,在去申请。
堆内内存和对外内存对应这Heap 和 Direct :
这个对应nio中的ByteBuffer中的内存一致 ,概念在于堆外内存相对于堆内存是会提高性能。会减少一次数据间的复制。
在netty中默认就是PooledUnsafeDirectByteBuf
Unsafe的实现
通过Unsafe直接操作系统底层的东西,性能是比较快的。
操作是不安全的。这里的不安全针对的不是数据线程不安全,针对的是,底层系统支持存在的问题
- 这里从源码中就能看到对应的unsafe(PooledUnsafeDirectByteBuf)
- Netty会自动检测判断系统是否支持unsafe,来看判断是否允许这个类
- 在Netty中需要申请ByteBuf 可以通过Unpooled 进行申请非池化的Buf
public final class Unpooled {
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return ALLOC.directBuffer(initialCapacity, maxCapacity);
}
UnpooledByteBufAllocator是非池化的分配器
Netty中具有两种分配器包括UnpooledByteBufAllocator 和 PooledByteBufAllocator
对于池化没有工具类,也可以采用 PooledByteBufAllocator 分配器类似非池化的分配器,进行写一个自己的工具类
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
这些分配器都会做一个选择那个ByteBuf ,对象里面会做一个内容增加
@Override
protected byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
return bytes;
}
- 对于池化的 Buf,是没有工具类提供的。 所以我们还是使用非池化的
- 堆内内存和堆外内存使用上差别在于 array方法,堆外内存是没有实现的。
对比Unsafe和Safe操作
对于unsafe操作 是通过 PlatformDependent api操作 native方法操作底层。都是native的操作
对于safe的操作则就是 使用HeapByteBufUtil 中方法 ,直接使用 数组进行实现的。
PooledByteBuf对象、内存复用
池化ByteBuf 分析
- 首先 计算出内存需要的大小,并且计算内存块在缓存池中位置
- 找到了则从缓存池中取
- 没有找到或者不在缓存池中位置,则重新分配一块内存
缓存池原理
一共有三块区域 tiny small normal 区域
- 对于tiny 区域只存储16b到512b 的 内存
- 对于small 只存储512b到4kb的内存
- nomal存储4kb到32kb的存储
这里主要注意 一是有大小,以及每个格子都有数量限制,达到512个格子内存也会被回收
这中间的内存分配机制为jemalloc机制,是linux中内存分配机制
缓存池源码
- 从入口开始进去,统一从AbstractNioByteChannel 的read 方法中进入,接收请求过来的
- 这里ByteBufAllocator 使用的分配器是使用的PooledByteBufAllocator 来进行分配ByteBuf 这里跟着代码进去 创建一个默认的分配器 ,通过 io.netty.allocator.type 参数就能修改分配器 ;并且在分配器中有获取方法
- 分配的io的bytebuf,并在iobuffer 中申请对应的bufferr,
/**
* 分配一个{@link ByteBuf},最好是一个适合I/O的直接缓冲区。
*/
ByteBuf ioBuffer();
@Override
public ByteBuf ioBuffer() {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(DEFAULT_INITIAL_CAPACITY);
}
return heapBuffer(DEFAULT_INITIAL_CAPACITY);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}
- 在 newHeapBuffer 中就会创建一个池化的内存块,PoolArena
- PoolArena 这个就代表内存中一大块连续区域,由多个chunk 内存块组成
- 从当前线程的回收栈中取buf,没有可用的就会取创建一个新的。
- 当前线程都在Eventloop线程中,分配一个buf,会先在线程变量中维护了一个栈,弹出一个handle对象,创建新的handle PoolunsafeDirectByteBuf
- PoolunsafeDirectByteBuf 在PooledUnsafeDirectByteBuf 进行实现的newObject
- 分配内存空间,在bytebuf进行扩容的时候,会做容量的规整,到16的倍数,然后在判断当前buf所需要大小然后拿到那个内存空间。
这就是整个缓存的内存分配 过程。 直接使用 ByteBufAllocator 来 进行分配内存
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
// tiny
ByteBuf buf1 = allocator.directBuffer(495);// 分配的内存最大长度为496
System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
buf1.release();// 此时会被回收到tiny的512b格子中
ByteBuf buf2 = allocator.directBuffer(495);// 从tiny的512b格子去取
System.out.printf("buf2: 0x%X%n", buf2.memoryAddress());
buf2.release();
- 使用release进行释放数据‘
零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。利用java代码去实现零拷贝,将多个缓冲区显示为单个合并缓冲区的虚拟缓冲区。 不是像arraylist中 array.copy
- CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。 记录拷贝
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
-
wrapedBuffer()方法,将byte[]数组包装成ByteBuf对象。
ByteBuf newBuffer = Unpooled.wrappedBuffer(new byte[]{1,2,3,4,5});
-
slice()方法。将一个ByteBuf对象切分成多个ByteBuf对象。
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
将多个buf合并为一个,数组进行合并
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
buffer1.writeByte(2);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
newBuffer.writeByte()
System.out.println(newBuffer);
总结
整个ByteBuf是Netty非常高效的原因,也是封装了大量api,为我们内置了动态扩容,池化非池化 unsafe、零拷贝机制,使得整个netty框架才如此高效,解决并发连接大的问题。
以上是关于Netty之ByteBuf原理解析及应用的主要内容,如果未能解决你的问题,请参考以下文章