netty代码解析
Posted v4ki5mqu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty代码解析相关的知识,希望对你有一定的参考价值。
boot.bind:大致流程:boot.bind会在bossLoop注册一条serverChannel(关注accept事件),然后开启一个线程不停的selector,当有事件发生的时候,会产生一个channel和客户端通信,把这个channel注册到workLoop上,workLoop开启一个线程不停的selector。特别的,一个channel只能注册到一个loop上,一个loop可以接受多个channel。
通道pipeline的生成:在bind/connect过程中都会进入initAndRegister,会调用channelFactory.newChannel方法调用NioserverSocketChannel的构造方法,在构造方法里会通过provider创建出底层的nioSocketChannel。构建完成后会调用init(channel),会在channel的pipeline中添加ChannelInitializer,ChannelInitializer的作用就是initChannel时,将handler添加进去。如果是服务端,会额外的添加一个ServerBootstrapAcceptor(作用就是收到客户端连接时,将通道注册到workerGroup)。newChannel、init(channel)之后channel已经被创建出来,并且添加了一个ChannelInitializer。在init(channel)后会调用group.register(channel)。group会根据chooser选择出一个loop注册channel,会将channel的loop指定为选择出的loop(通道绑定loop),然后会把register0(promise)作为任务提交到loop。loop会开启一个线程(只会开启一个),这个线程负责调用SingleThreadEventExecutor.this.run();这个run中会根据select策略、是否有任务进行select,然后调用updateSelectKeys,(unsafe.read()--->pipeline.fireChannelRead--->pipeline处理。),然后runAllTasks,保证所有提交任务都会执行。register0(promise)中会进行调用doRegister()JDK 底层的操作将Channel 注册到 Selector上,然后会调用pipeline.invokeHandlerAddedIfNeeded(),将pipeline构造完成(用户的handler会添加进去,ChannelInitializer会移除)。
服务端的 boot.handler和boot.childHandler区别:handler对serverChannel生效,也就是accpet的时候。childHandler是对客户端连接之后,服务端生成一个channel和客户端通信,child中的handler对这个channel生效。
参考:https://www.cnblogs.com/yuand...
// 源码解析
初始化eventLoopGroup:
最终调用到MultithreadEventExecutorGroup.MultithreadEventExecutorGroup()
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 这个就是NioEventLoop执行任务的执行器,NioEventLoop绑定的线程由它产生。
// 并且这里构造的线程是FastThreadLocalThread
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 构建EventLoop数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
boolean success = false;
try {
// 初始化EventLoop,这个就是工作线程了
// 主要是赋值provider seletor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) { // 如果一个失败了 全部关闭
for (int j = 0; j < i; j++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) { // 失败了 死循环中断
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务 选择器
chooser = chooserFactory.newChooser(children);
// 中断监听
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e : children) {
e.terminationFuture().addListener(terminationListener);
}
// 生成一个只读的set
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
newChild()最终调用到NioEventLoopGroup.newChild() 然后调用的NioEventLoop.NioEventLoop()
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 设置selector的Provider
provider = selectorProvider;
// 根据provider获取selector 这个就是nio的selector,负责轮询的
selector = openSelector();
// selector.select()时候的策略
selectStrategy = strategy;
}
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);调用的是SingleThreadEventExecutor.SingleThreadEventExecutor()
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
// 设置添加任务是否唤醒
this.addTaskWakesUp = addTaskWakesUp;
// 最大任务数
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 任务执行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
// 拒绝策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
eventLoopGroup初始化:
nThreads:线程池中的线程数,也就是 NioEventLoop 的实例数量,默认地 nThreads 会被设置为 CPU 核心数 *2。
executor:给NioEventLoop使用的。NioEventLoop运行任务的线程就是由它创造执行的。
chooserFactory:提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。
selectorProvider:实例化 JDK 的 Selector,每个线程池都持有一个 selectorProvider 实例。
selectStrategyFactory:NioEventLoop在select是参考的策略。
rejectedExecutionHandler:线程池拒绝策略。
NioEventLoop的构建(构建过程中调用newChild):在eventLoopGroup初始化过程中,会根据nThreads生成对应数量的loop,调用newChid()构建loop,会调用NioEventLoop的构造函数生成loop,构建时会根据系统生成provider,然后产生对应的selector,也就是每个loop都有selector。特别的:每个loop都是一个线程,这个线程的主要工作就是从taskQueue中拉去任务运行,loop中的线程是由loopGroup.executor产生的。bind/connect本质上就是提交一个任务到loop的taskQueue。当有新的连接accept到服务端,服务端的ServerBootstrapAcceptor会把新开的channel注册到workGroup上(根据chooser策略选择一个loop),就是提交任务到loop的taskQueue上。
绑定过程
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注册通道
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 进行socket绑定。
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it\'s not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister方法:创建、初始化channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 向group注册channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
newChannel方法:创建channel,根据配置的channel工厂利用反射创建channel,设置了opt,最终使用provider.openServerSocketChannel()/provider.openSocketChannel()生成channel
init方法:设置channl属性,设置pipeline
void init(Channel channel) throws Exception {
// options设置
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 属性设置
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 设置pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 这里添加了ChannelInitializer,这里添加了ServerBootstrapAcceptor,这个会将新连接的channel注册到work group去
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception { // 通道注册时就会调用
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 其中的handler是用户配置的监听连接的handlers,有可能是ChannelInitializer
// 为了保证用户的handlers全部能够再ServerBootstrapAcceptor之前被加载,
// 必须要延后加载ServerBootstapAcceptor
ch.eventLoop().execute(new Runnable() { // 通道注册完毕了,开始监听事件了
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
register方法:向group注册channel
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 通道和eventLoop绑定
AbstractChannel.this.eventLoop = eventLoop;
// 如果是自己线程 直接register0
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 提交一个任务到eventLoop
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register0 方法:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 真正注册的地方
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 触发用户定义的handlerAdded(...)监听
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 传播一个register事件放入pipeline 所有关心 register 事件的 handler会触发对应函数
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// 第一次注册不会进入此逻辑 还没有绑定socket激活
// bind完端口才会激活active。端口都没绑定channel是没法激活的
// 在通道中传播active事件
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
invokeHandlerAddedIfNeeded方法:第一次注册就调用callHandlerAddedForAllHandlers
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It\'s time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
callHandlerAddedForAllHandlers方法:
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC\'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// while循环 遍历所有的handler
while (task != null) {
task.execute();
task = task.next;
}
}
最终进入
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 触发所有hander的handlerAdded方法
// 如果是ChannelInitializer最终会调用initChannel(Server bind过程中就添加的ChannelInitializer的initChannel会在这里触发)
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
....
}
fireChannelRegistered函数:
public final ChannelPipeline fireChannelRegistered() {
// 传递的是head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
invokeChannelRegistered函数:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
// 还是交给channel对应的eventLoop中的线程执行
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
invokeChannelRegistered方法:
private void invokeChannelRegistered() {
// 确保hander添加完毕
if (invokeHandler()) {
try {
// ChannelInboundHandler类型的hander才会触发
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
进入head的channelRegistered方法:
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
fireChannelRegistered 方法:
public ChannelHandlerContext fireChannelRegistered() {
//findContextInbound 寻找head的下一个inboundHandler 触发register事件
invokeChannelRegistered(findContextInbound());
return this;
}
doRegister方法:真正绑定socket的地方
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将通道绑定到eventLoop的selector,
// 这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作
// 注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。
// 通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。
// 所以,此处注册只需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量 selectionKey 赋值
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
initAndRegister之后是doBind0方法:进行socket绑定,这里正式开始监听
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 提交词任务到channel的eventLoop中去
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) { // channel注册成功了
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
bind方法:最终进入的是
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
// findContextOutbound 从tail节点往前走,寻找outBound类型的handler
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
invokeBind方法:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 一般的handler没有重写bind方法,最终会走到head里面
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
head的bind方法:
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// 真正绑定的地方
unsafe.bind(localAddress, promise);
}
AbstractChannel.bind方法
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
// Warn a user about the fact that a non-root user can\'t receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can\'t receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// dobind之后,通道绑定了socket,已经被激活了
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
doBind:最终绑定进入的是NioServerSocketChannel.bind:底层socket和channel绑定
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress);
} else {
javaChannel().socket().bind(localAddress);
}
}
绑定之后会调用pipeline.fireChannelActive():在通道中传播通道激活的事件
在head之中的channelActive方法会调用readIfIsAutoRead,
这个会在通道中传播read事件,每个handler都会调用read事件,最终会调用head的read方法,修改channel的感兴趣事件.此事件在初始化的时候就设定好了
//参考 https://blog.csdn.net/u013828...
//
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
//检查该SelectionKey是否有效,如果无效,则关闭channel
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
// Connection already closed - no need to handle write.
return;
}
}
// 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
//该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况
1)OP_ACCEPT,接受客户端连接
2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。
// read过程:unsafe.read();在channel构造时初始化了unsafe,channel为 NioServerSocketChannel,则unsafe为NioServerSocketChannel的属性为NioMessageUnsafe
// 主要是 doReadMessages(readBuf) -->pipeline.fireChannelRead(readBuf.get(i))-->pipeline.fireChannelReadComplete()
public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
// readBuf 是NioMessageUnsafe的类属性 List<Object>
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
}
doReadMessages函数:服务端是NioServerSocketChannel.doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
// 新增了一个NioSocketChannel 最终并且为这个NioSocketChannel设置了readOpt
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
new NioSocketChannel(this, ch)最终调用:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 初始化的是OP_READ
super(parent, ch, SelectionKey.OP_READ);
}
pipeline.fireChannelRead(readBuf.get(i))会触发pipeline上所有inbound的channelRead函数,也会调用ServerBootstrapAcceptor.channelRead,服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor。
ServerBootstrapAcceptor.channelRead函数:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这个强制转化是因为前面doReadMessages函数放入的就是这个
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册通道到childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// write过程
AbstractChannelHandlerContext.write方法:
从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokeWrite(m, promise),最终进入head调用unsafe.write(msg, promise);
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
// ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。链表维护了三个指针
// 1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
// 2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
// 3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
msg = filterOutboundMessage(msg);
// 计算消息大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 放入链表
outboundBuffer.addMessage(msg, size, promise);
}
// flush过程
最终进入到head调用unsafe.flush
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 确认要发送的entry
outboundBuffer.addFlush();
// 发送数据
flush0();
}
flush0方法:
protected final void flush0() {
// Flush immediately only when there\'s no pending flush.
// If there\'s a pending flush operation, event loop will call forceFlush() later,
// and thus there\'s no need to call it now.
if (isFlushPending()) { // 如果注册了write事件,认为是socket暂时不可写。Socket绝大部分情况下是可以写的,只有缓存区满了或者网络原因才不可写
// 等内核主动告知可写的时候再写入,由selector触发flush操作
return;
}
super.flush0();
}
最终进入doWrite方法:
NioSocketChannel#doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) { //根据bytebuf的数量调用不同函数
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
// 没有把所有的buffers写完
// 这里会向通道注册write事件
incompleteWrite(setOpWrite);
break;
}
}
}
javaChannel.write最终调用的是 n = IOUtil.write(fd, src, -1, nd);
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
//如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
if (var1 instanceof DirectBuffer) {
return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
//非DirectBuffer
//获取已经读取到的位置
int var5 = var1.position();
//获取可以读到的位置
int var6 = var1.limit();
assert var5 <= var6;
//申请一个原buffer可读大小的DirectByteBuffer
int var7 = var5 <= var6 ? var6 - var5 : 0;
ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
int var10;
try {
var8.put(var1);
var8.flip();
var1.position(var5);
//通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
if (var9 > 0) {
var1.position(var5 + var9);
}
var10 = var9;
} finally {
//回收分配的DirectByteBuffer
Util.offerFirstTemporaryDirectBuffer(var8);
}
return var10;
}
}
// 参考 https://blog.csdn.net/lblblbl...
https://www.cnblogs.com/java-chen-hao/p/11477384.html
https://blog.csdn.net/TheLudlows/article/details/83997280
// 对象池 https://www.jianshu.com/p/854...
// 内存分配
// https://blog.csdn.net/pentium...
// https://www.cnblogs.com/ricki...
// FastThreadLocal https://www.jianshu.com/p/14f...
以上是关于netty代码解析的主要内容,如果未能解决你的问题,请参考以下文章
片段(Java) | 机试题+算法思路+考点+代码解析 2023
netty源码解解析(4.0)-16 ChannelHandler概览
spring webflux(netty)处理程序无法解析包含大于 750 字节的 json 的 ServerRequest