Netty源码分析-NioByteUnsafe(read读取流程)
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-NioByteUnsafe(read读取流程)相关的知识,希望对你有一定的参考价值。
NioByteUnsafe封装了NiosocketChannel读取底层数据的流程。
NioEventLoop负责监听Selector上所有的事件,当发生事件时根据事件类型调用Channel的UnSafe中的方法去处理。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch)
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid())
final EventLoop eventLoop;
try
eventLoop = ch.eventLoop();
catch (Throwable ignored)
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this)
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
try
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0)
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0)
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
catch (CancelledKeyException ignored)
unsafe.close(unsafe.voidPromise());
NioByteUnsafe当中的read方法
1、分配ByteBuf。
2、从底层SocketChannel中读取字节,封装到ByteBuf中。
3、调用Channel的PPLine交给管道中的编解码器去处理。
4、调用各种事件。
@Override
public final void read()
final ChannelConfig config = config();
if (shouldBreakReadReady(config))
clearReadPending();
return;
//每个channel对应一个PPLine
final ChannelPipeline pipeline = pipeline();
//ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
//容量计算器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//重置,把之前计数的值全部清空
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try
do
//分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
byteBuf = allocHandle.allocate(allocator);
//doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
//更新容量计算器
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果小于0 则socket关闭,如果等于0则没读取到数据
if (allocHandle.lastBytesRead() <= 0)
// nothing was read. release the buffer.
//释放资源
byteBuf.release();
byteBuf = null;
//如果小于0则意味着socket关闭
close = allocHandle.lastBytesRead() < 0;
if (close)
// There is nothing left to read as we received an EOF.
readPending = false;
break;
//增加循环计数器
allocHandle.incMessagesRead(1);
readPending = false;
//把读取到的数据,交给管道去处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//判断是否继续从socket读取数据
while (allocHandle.continueReading());
//读取完成后调用readComplete,重新估算内存分配容量
allocHandle.readComplete();
//事件激发
pipeline.fireChannelReadComplete();
//如果需要关闭,则处理关闭
if (close)
closeOnRead(pipeline);
catch (Throwable t)
handleReadException(pipeline, byteBuf, t, close, allocHandle);
finally
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
//
//根据情况移除OP_READ事件
if (!readPending && !config.isAutoRead())
removeReadOp();
在读取中遇到异常,或者返回EOF(-1),说明底层通道关闭,需要处理关闭逻辑。
private void closeOnRead(ChannelPipeline pipeline)
//底层输入流未关闭
if (!isInputShutdown0())
//是否允许办关闭
if (isAllowHalfClosure(config()))
//关闭输入流
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
else
//关闭socket
close(voidPromise());
else
//激发事件
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
异常处理的逻辑
//异常处理
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle)
if (byteBuf != null)
//如果byteBuf有数据,则交给管道处理
if (byteBuf.isReadable())
readPending = false;
pipeline.fireChannelRead(byteBuf);
else
//释放资源
byteBuf.release();
//容量计算机更新guess大小
allocHandle.readComplete();
//激发读完成事件和异常事件
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
//关闭socket
if (close || cause instanceof IOException)
closeOnRead(pipeline);
关闭socket的逻辑,需要释放输出队列缓存,触发关闭事件,关闭底层socket等。
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify)
//设置不能取消
if (!promise.setUncancellable())
return;
//防止二次关闭,如果已经关闭
if (closeInitiated)
if (closeFuture.isDone())
// Closed already.
safeSetSuccess(promise);
else if (!(promise instanceof VoidChannelPromise)) // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
promise.setSuccess();
);
return;
closeInitiated = true;
//判断当前连接是否还连着
final boolean wasActive = isActive();
//输出缓存队列
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose();
if (closeExecutor != null)
closeExecutor.execute(new Runnable()
@Override
public void run()
try
// Execute the close.
//调用底层关闭socket
doClose0(promise);
finally
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable()
@Override
public void run()
if (outboundBuffer != null)
// Fail all the queued messages
//删除队列缓存中还没发出去的消息,释放资源
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
//激发fireChannelInactive事件
//激发fireChannelUnregistered事件
fireChannelInactiveAndDeregister(wasActive);
);
);
else
try
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
finally
if (outboundBuffer != null)
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
if (inFlush0)
invokeLater(new Runnable()
@Override
public void run()
fireChannelInactiveAndDeregister(wasActive);
);
else
fireChannelInactiveAndDeregister(wasActive);
技术交流QQ:212320390
关注公众号
以上是关于Netty源码分析-NioByteUnsafe(read读取流程)的主要内容,如果未能解决你的问题,请参考以下文章