Netty源码分析:accept
Posted HelloWorld_EE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析:accept相关的知识,希望对你有一定的参考价值。
Netty源码分析:accept
在介绍accept之前,先介绍下NioserverSocketChannelConfig这个类,为什么先介绍这个类呢,这是因为:在accept客户端连接时会使用该类的maxMessagesPerRead这个字段,该字段的含义为:每次读的最大信息,Netty中将accept客户端连接也认为是一种读操作。对于accept客户端连接的这种读,利用该字段表示的是一次能够接受的最大连接数。目前这里只关注这一点。
1、NioServerSocketChannelConfig
在NioServerSocketChannel的构造函数中,我们看到会实例化一个NioServerSocketChannelConfig对象,本篇博文就介绍下这个类的构造函数做了哪些工作。
public NioServerSocketChannel(ServerSocketChannel channel)
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
NioServerSocketChannelConfig是NioServerSocketChannel的私有内部类。继承体系结构如下:
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket)
super(channel, javaSocket);
@Override
protected void autoReadCleared()
setReadPending(false);
该类中就有一个构造函数,以及重写了父类 DefaultServerSocketChannelConfig 的autoReadCheared()方法, 该方法通过调用setReadPending(false)方法将DefaultServerSocketChannelConfig的字段readPending设为false。
protected void setReadPending(boolean readPending)
this.readPending = readPending;
继续看 DefaultServerSocketChannelConfig 的构造函数:
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket)
super(channel);
if (javaSocket == null)
throw new NullPointerException("javaSocket");
this.javaSocket = javaSocket;
DefaultChannelConfig
public DefaultChannelConfig(Channel channel)
if (channel == null)
throw new NullPointerException("channel");
this.channel = channel;
if (channel instanceof ServerChannel || channel instanceof AbstractNioByteChannel)
// Server channels: Accept as many incoming connections as possible.
// NIO byte channels: Implemented to reduce unnecessary system calls even if it's > 1.
// See https://github.com/netty/netty/issues/2079
// TODO: Add some property to ChannelMetadata so we can remove the ugly instanceof
maxMessagesPerRead = 16;
else
maxMessagesPerRead = 1;
该构造函数中我们注意一下这个maxMessagesPerRead这个字段,对于ServerChannel和AbstractNioByteChannel的实例设置为16,原因在于:对于server channel我们需要接受尽可能多的连接。 对于 AbstractNioByteChannelNIO我们应该通过使其等于16(即大于1)来实现减少不必要的系统调用。
2、Netty源码分析:accept
在博文Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理的末尾我们分析了processSelectedKey这个方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch)
final NioUnsafe unsafe = ch.unsafe();
//检查该SelectionKey是否有效,如果无效,则关闭channel
if (!k.isValid())
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
try
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
if (!ch.isOpen()) //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
// Connection already closed - no need to handle write.
return;
// 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
if ((readyOps & SelectionKey.OP_WRITE) != 0)
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
// 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
if ((readyOps & SelectionKey.OP_CONNECT) != 0)
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
catch (CancelledKeyException ignored)
unsafe.close(unsafe.voidPromise());
该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况
1)OP_ACCEPT,接受客户端连接
2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。
本篇博文主要来看下当Boss 线程 selector检测到OP_ACCEPT事件时,内部干了些什么。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
if (!ch.isOpen()) //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
// Connection already closed - no need to handle write.
return;
从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_ACCEPT,执行unsafe的read方法。注意这里的unsafe是NioMessageUnsafe的实例,具体可以在博文Netty源码分析:服务端启动全过程的NioServerSocketChannel的实例化过程中可以了解到。
下面来看下NioMessageUnsafe中的read方法
@Override
public void read()
assert eventLoop().inEventLoop();
//NioServerSocketChannelConfig
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending())
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
final int maxMessagesPerRead = config.getMaxMessagesPerRead();//16
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try
try
for (;;)
int localRead = doReadMessages(readBuf);
if (localRead == 0) //如果localRead等于0,说明没有连接进来
break;
if (localRead < 0)
closed = true;
break;
// stop reading and remove op
if (!config.isAutoRead())
break;
if (readBuf.size() >= maxMessagesPerRead)
break;
catch (Throwable t)
exception = t;
setReadPending(false);
int size = readBuf.size();
//对于每个客户端SocketChannel,在pipeline中寻找第一个Inbound=true的HandlerContext来对其进行处理。
for (int i = 0; i < size; i ++)
pipeline.fireChannelRead(readBuf.get(i));
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null)
if (exception instanceof IOException)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
pipeline.fireExceptionCaught(exception);
if (closed)
if (isOpen())
close(voidPromise());
finally
if (!config.isAutoRead() && !isReadPending())
removeReadOp();
对于上面的代码的基本逻辑如下:
1)在for循环中,通过调用方法doReadMessages来进行处理ServerSocketChannel的accept操作。 如果此时没有客户端连接,则退出for循环进行后续的处理,如果有客户端连接,则将客户端NioSocketChannel保存到readBuf中(默认不超过16个),如果超过16个,则也退出for循环进行后续的处理。
注:借用readBuf 这个List<Object>
是用来保存客户端NioSocketChannel,默认一次不超过16个.
//NioServerSocketChannel.java
@Override
protected int doReadMessages(List<Object> buf) throws Exception
//检测是否有客户端的连接进来
SocketChannel ch = javaChannel().accept();
try
if (ch != null)
buf.add(new NioSocketChannel(this, ch));
return 1;
catch (Throwable t)
logger.warn("Failed to create a new channel from an accepted socket.", t);
try
ch.close();
catch (Throwable t2)
logger.warn("Failed to close a socket.", t2);
return 0;
在博文 Java NIO 之 ServerSocketChannel SocketChannel中我们知道,ServerSocketChannel有阻塞和非阻塞两种模式:
a、阻塞模式:ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。
b、非阻塞模式:,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null.
在NioServerSocketChannel的构造函数分析中,我们知道,其通过ch.configureBlocking(false);
语句设置当前的ServerSocketChannel为非阻塞的。
2)第一步得到的客户端连接对象保存在了readBuf这个List中,然后对readBuf中保存的每个客户端SocketChannel,触发各自pipeline的ChannelRead事件,具体为:在pipeline中从head节点开始寻找第一个Inbound=true
的HandlerContext来对其进行处理,通过跟踪代码我们发现最终执行ServerBootstrapAcceptor的channelRead方法。至于ServerBootstrapAcceptor这个Inbound类型的handler是何时添加到Pipeline中以AbstractChannelHandlerContext为节点的链表中的,目前还不知道,后面我们将会解析。先看下ServerBootstrapAcceptor类的channelRead方法主要做了些什么。
ServerBootstrapAcceptor该方法的代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg)
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions)
try
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue()))
logger.warn("Unknown channel option: " + e);
catch (Throwable t)
logger.warn("Failed to set a channel option: " + child, t);
for (Entry<AttributeKey<?>, Object> e: childAttrs)
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
try
childGroup.register(child).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
if (!future.isSuccess())
forceClose(child, future.cause());
);
catch (Throwable t)
forceClose(child, t);
1、通过child.pipeline().addLast(childHandler)添加childHandler到NioSocketChannel的pipeline。其中childHandler是通过ServerBootstrap的childHandler方法进行配置的。
2、通过childGroup.register(child)将NioSocketChannel注册到work的eventLoop中,这个过程和NioServerSocketChannel注册到boss的eventLoop的过程一样,最终由work线程对应的selector进行read事件的监听。
小结
看到这里我们就明白了一点:Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听。
以上是关于Netty源码分析:accept的主要内容,如果未能解决你的问题,请参考以下文章