Netty源码分析:read
Posted HelloWorld_EE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析:read相关的知识,希望对你有一定的参考价值。
Netty源码分析:read
在博文Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理的末尾我们分析了processSelectedKey这个方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch)
final NioUnsafe unsafe = ch.unsafe();
//检查该SelectionKey是否有效,如果无效,则关闭channel
if (!k.isValid())
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
try
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
if (!ch.isOpen()) //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
// Connection already closed - no need to handle write.
return;
// 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
if ((readyOps & SelectionKey.OP_WRITE) != 0)
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
// 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
if ((readyOps & SelectionKey.OP_CONNECT) != 0)
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
catch (CancelledKeyException ignored)
unsafe.close(unsafe.voidPromise());
该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况
1)OP_ACCEPT,接受客户端连接
2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。
本篇博文主要来看下当work 线程 selector检测到OP_READ事件时,内部干了些什么。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
if (!ch.isOpen()) //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
// Connection already closed - no need to handle write.
return;
从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_ACCEPT,执行unsafe的read方法。注意这里的unsafe是NioByteUnsafe的实例,
为什么说这里的unsafe是NioByteUnsafe的实例呢?在上篇博文Netty源码分析:accept中我们知道Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的实例,如果你想了解这一点,可以看这篇博文Netty源码分析:客户端连接.
下面来看下NioByteUnsafe中的read方法
@Override
public void read()
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending())
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null)
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try
int totalReadAmount = 0;
boolean readPendingReset = false;
do
//1、分配缓存
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();//可写的字节容量
//2、将socketChannel数据写入缓存
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0)
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
if (!readPendingReset)
readPendingReset = true;
setReadPending(false);
//3、触发pipeline的ChannelRead事件来对byteBuf进行后续处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount)
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead())
break;
if (localReadAmount < writable)
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close)
closeOnRead(pipeline);
close = false;
catch (Throwable t)
handleReadException(pipeline, byteBuf, t, close);
finally
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending())
removeReadOp();
下面一一介绍比较重要的代码
1、final ByteBufAllocator allocator = config.getAllocator()
这行代码的作用:得到缓存分配器,默认情况是:UnpooledByteBufAllocator实例。我们可以通过设置属性io.netty.allocator.type来改变。如果设置io.netty.allocator.type = “pooled”,则缓存分配器就为:PooledByteBufAllocator。
为什么作用如上所述呢?
具体跟踪代码,最后的落地点在ByteBufUtil的DEFAULT_ALLOCATOR常量上,该常量初始化如下:
//ByteBufUtil
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static
//省略了部分代码
String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType))
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: ", allocType);
else if ("pooled".equals(allocType))
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: ", allocType);
else
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: unpooled (unknown: )", allocType);
DEFAULT_ALLOCATOR = alloc;
//省略了部分代码
得到的结论是:默认情况allocType为unpooled,即内存分配器alloc为UnpooledByteBufAllocator实例。当然,我们通过设置属性io.netty.allocator.type来改变。
2、allocHandler的实例化过程
allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allocHandler的实例化过程
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null)
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
其中, config.getRecvByteBufAllocator()
得到的是一个 AdaptiveRecvByteBufAllocator实例DEFAULT。
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
而AdaptiveRecvByteBufAllocator中的newHandler()方法的代码如下:
@Override
public Handle newHandle()
return new HandleImpl(minIndex, maxIndex, initial);
HandleImpl(int minIndex, int maxIndex, int initial)
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
其中,上面方法中所用到参数:minIndex maxIndex initial是什么意思呢?含义如下:minIndex是最小缓存在SIZE_TABLE
中对应的下标。maxIndex是最大缓存在SIZE_TABLE
中对应的下标,initial为初始化缓存大小。
可能说了这些参数的含义有些突然哈,这就有必要介绍下AdaptiveRecvByteBufAllocator的相关字段以及构造函数了。
AdaptiveRecvByteBufAllocator的相关常量字段
public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
private static final int[] SIZE_TABLE;
上面这些字段的具体含义说明如下:
1)、SIZE_TABLE
:按照从小到大的顺序预先存储可以分配的缓存大小。
从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。SIZE_TABLE初始化过程如下。
static
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16)
sizeTable.add(i);
for (int i = 512; i > 0; i <<= 1)
sizeTable.add(i);
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++)
SIZE_TABLE[i] = sizeTable.get(i);
2)、DEFAULT_MINIMUM
:最小缓存(64),在SIZE_TABLE中对应的下标为3。
3)、DEFAULT_MAXIMUM
:最大缓存(65536),在SIZE_TABLE中对应的下标为38。
4)、DEFAULT_INITIAL
:初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
5)、INDEX_INCREMENT
:上次预估缓存偏小,下次index的递增值。
6)、INDEX_DECREMENT
:上次预估缓存偏大,下次index的递减值。
构造函数:
private AdaptiveRecvByteBufAllocator()
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum)
if (minimum <= 0)
throw new IllegalArgumentException("minimum: " + minimum);
if (initial < minimum)
throw new IllegalArgumentException("initial: " + initial);
if (maximum < initial)
throw new IllegalArgumentException("maximum: " + maximum);
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum)
this.minIndex = minIndex + 1;
else
this.minIndex = minIndex;
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum)
this.maxIndex = maxIndex - 1;
else
this.maxIndex = maxIndex;
this.initial = initial;
该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allocHandle对象所要用到的参数。
private final int minIndex;
private final int maxIndex;
private final int initial;
其中,getSizeTableIndex函数的代码如下,该函数的功能为:找到SIZE_TABLE中的元素刚好大于或等于size的位置。
private static int getSizeTableIndex(final int size)
for (int low = 0, high = SIZE_TABLE.length - 1;;)
if (high < low)
return low;
if (high == low)
return high;
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b)
low = mid + 1;
else if (size < a)
high = mid - 1;
else if (size == a)
return mid;
else //这里的情况就是 a < size <= b 的情况
return mid + 1;
3、byteBuf = allocHandle.allocate(allocator);
申请一块指定大小的内存
AdaptiveRecvByteBufAllocator#HandlerImpl
@Override
public ByteBuf allocate(ByteBufAllocator alloc)
return alloc.ioBuffer(nextReceiveBufferSize);
直接调用了ioBuffer方法,继续看
AbstractByteBufAllocator.java
@Override
public ByteBuf ioBuffer(int initialCapacity)
if (PlatformDependent.hasUnsafe())
return directBuffer(initialCapacity);
return heapBuffer(initialCapacity);
ioBuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。
先看 heapBuffer
AbstractByteBufAllocator.java
@Override
public ByteBuf heapBuffer(int initialCapacity)
return heapBuffer(initialCapacity, Integer.MAX_VALUE);
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity)
if (initialCapacity == 0 && maxCapacity == 0)
return emptyBuf;
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
这里的newHeapBuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator,则直接返回一个UnpooledHeapByteBuf对象。
UnpooledByteBufAllocator.java
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
PooledByteBufAllocator.java
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null)
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
else
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
return toLeakAwareBuffer(buf);
PooledByteBufAllocator中的newHeapBuffer方法来分配内存在博文Netty源码分析:PooledByteBufAllocator以及Netty源码分析:PoolArena有详细的介绍。
再看directBuffer
AbstractByteBufAllocator.java
@Override
public ByteBuf directBuffer(int initialCapacity)
return directBuffer(initialCapacity, Integer.MAX_VALUE);
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity)
if (initialCapacity == 0 && maxCapacity == 0)
return emptyBuf;
validate(initialCapacity, maxCapacity);//参数的有效性检查
return newDirectBuffer(initialCapacity, maxCapacity);
与newHeapBuffer一样,这里的newDirectBuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator。
PooledByteBufAllocator.newDirectBuffer的内部实现可以参考博文Netty源码分析:PoolArena
这里主要看下UnpooledByteBufAllocator. newDirectBuffer的内部实现
UnpooledByteBufAllocator.java
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
ByteBuf buf;
if (PlatformDependent.hasUnsafe())
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
else
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return toLeakAwareBuffer(buf);
先看UnpooledUnsafeDirectByteBuf构造函数,UnpooledDirectByteBuf类似,这里不看,然后看下toLeakAwareBuffer方法
UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity)
super(maxCapacity);
//省略了部分参数检查的代码
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity));
super(maxCapacity)
这行代码主要是将maxCapacity参数设置在了其父类AbstractReferenceCountedByteBuf的父类AbstractByteBuf的字段maxCapacity上。
protected ByteBuffer allocateDirect(int initialCapacity)
return ByteBuffer.allocateDirect(initialCapacity);
private void setByteBuffer(ByteBuffer buffer)
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null)
if (doNotFree)
doNotFree = false;
else
freeDirect(oldBuffer);
this.buffer = buffer;
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
上面代码的主要逻辑为:
1、先利用ByteBuffer的allocateDirect方法分配了大小为initialCapacity的缓存
2、然后判断将旧缓存给free掉
3、最后将新缓存赋给字段buffer上
其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 获取buffer的address字段值,指向缓存地址。
capacity = buffer.remaining() 获取缓存容量。
接下来看toLeakAwareBuffer(buf)方法
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf)
ResourceLeak leak;
switch (ResourceLeakDetector.getLevel())
case SIMPLE:
leak = AbstractByteBuf.leakDetector.open(buf);
if (leak != null)
buf = new SimpleLeakAwareByteBuf(buf, leak);
break;
case ADVANCED:
case PARANOID:
leak = AbstractByteBuf.leakDetector.open(buf);
if (leak != null)
buf = new AdvancedLeakAwareByteBuf(buf, leak);
break;
return buf;
方法toLeakAwareBuffer(buf)对申请的buf又进行了一次包装。将UnpooledUnsafeDirectByteBuf buf根据ResourceLeakDetector.getLevel()返回值的不同封装为不同的Buf。
Netty中使用引用计数机制来管理资源,ByteBuf实现了ReferenceCounted接口,当实例化一个ByteBuf时,引用计数为1, 代码中需要保持一个该对象的引用时需要调用retain方法将计数增1,对象使用完时调用release将计数减1。当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池。
上面一长串的分析,得到了缓存后,回到AbstractNioByteChannel.read方法,继续看。
4、doReadBytes方法
下面看下doReadBytes方法:将socketChannel数据写入缓存。
NiosocketChannel.java
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
将Channel中的数据读入缓存byteBuf中。继续看
WrappedByteBuf.java
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException
return buf.writeBytes(in, length);
AbstractByteBuf.java
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException
ensureAccessible();
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0)
writerIndex += writtenBytes;
return writtenBytes;
这里的setBytes方法有不同的实现,这里看下UnpooledUnsafeDirectByteBuf的setBytes的实现。
UnpooledUnsafeDirectByteBuf.java
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException
ensureAccessible();
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(index).limit(index + length);
try
return in.read(tmpBuf);
catch (ClosedChannelException ignored)
return -1;//当Channel 已经关闭,则返回-1.
private ByteBuffer internalNioBuffer()
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null)
this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
return tmpNioBuf;
最终底层采用ByteBuffer实现read操作,无论是PooledByteBuf、还是UnpooledXXXBuf,里面都将底层数据结构BufBuffer/array转换为ByteBuffer 来实现read操作。即无论是UnPooledXXXBuf还是PooledXXXBuf里面都有一个ByteBuffer tmpNioBuf,这个tmpNioBuf才是真正用来存储从管道Channel中读取出的内容的。
到这里就完成了将channel的数据读入到了缓存Buf中。
回到AbstractNioByteChannel.read方法,继续看。
@Override
public void read()
//...
try
int totalReadAmount = 0;
boolean readPendingReset = false;
do
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0)
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
if (!readPendingReset)
readPendingReset = true;
setReadPending(false);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount)
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead())
break;
if (localReadAmount < writable)
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close)
closeOnRead(pipeline);
close = false;
catch (Throwable t)
handleReadException(pipeline, byteBuf, t, close);
finally
if (!config.isAutoRead() && !isReadPending())
removeReadOp();
int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,则表示没有读取到数据,则退出循环。
2、如果返回-1,表示对端已经关闭连接,则退出循环。
3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。Pipeline的事件处理在博文 Netty源码分析:ChannelPipeline有详细的介绍。处理完成之后,再一次从Channel读取数据,直至退出循环。
这里的退出循环除了上面返回0或者-1之外,还存在如下几种情况:
3.1)读取到Buf中的总容量大于Integer.MAX_VALUE - localReadAmount时为避免溢出则退出。
3.2)config的autoRead为false,则退出
3.3)循环次数超过maxMessagesPerRead时,即只能在管道中读取maxMessagesPerRead次数据,既是还没有读完也要退出。在上篇博文中,Boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到OP_ACCEPT事件后一次只能接受maxMessagesPerRead个客户端连接
参考资料
1、http://www.jianshu.com/p/6b48196b5043
2、http://xw-z1985.iteye.com/blog/1972779
以上是关于Netty源码分析:read的主要内容,如果未能解决你的问题,请参考以下文章