netty源码之断开连接
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之断开连接相关的知识,希望对你有一定的参考价值。
目录
2、doClose0调用jdk的close 关闭channel
3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件
4、doDeregister取消selector上的selectorKey
一、前言
关闭的逻辑是在read方法里 , 当接受的数据<0时,说明没有可接受的数据了,可以关闭了。
二、正常关闭
1.allocHandle.lastBytesRead() <= 0
这一行会发现数据小于零,释放byteBuf
2.进入到closeOnRead(pipeline);
完成关闭事件
@Override public final void read() final ChannelConfig config = config(); if (shouldBreakReadReady(config)) clearReadPending(); return; final ChannelPipeline pipeline = pipeline(); // 内存分配器 final ByteBufAllocator allocator = config.getAllocator(); // 接收数据测handler final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try do // 分配内存是自适应的 byteBuf = allocHandle.allocate(allocator); // 开始读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) // 读取为-1,表示断开连接 // nothing was read. release the buffer. byteBuf.release();//释放byteBuf的空间 byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) // There is nothing left to read as we received an EOF. readPending = false; break; allocHandle.incMessagesRead(1);//读取一次数据 readPending = false; pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去 byteBuf = null; while (allocHandle.continueReading());//继续读取 // 通过当前 allocHandle.readComplete();//计算下一次的需要分配的空间 pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去 if (close) closeOnRead(pipeline);//关闭 catch (Throwable t) handleReadException(pipeline, byteBuf, t, close, allocHandle); finally // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) removeReadOp();
继续跟进核心的方法在AbstractChannel.close方法里
private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) if (!promise.setUncancellable()) return;
if (closeInitiated) if (closeFuture.isDone()) // Closed already. safeSetSuccess(promise); else if (!(promise instanceof VoidChannelPromise)) // Only needed if no VoidChannelPromise. // This means close() was called before so we just register a listener and return closeFuture.addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception promise.setSuccess(); ); return; closeInitiated = true; final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // outboundBuffer置空 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose();// 关闭之前将数据发送完成之前的一些操作 if (closeExecutor != null) closeExecutor.execute(new Runnable() @Override public void run() try // Execute the close. doClose0(promise); finally // Call invokeLater so closeAndDeregister is executed in the EventLoop again! invokeLater(new Runnable() @Override public void run() if (outboundBuffer != null) // Fail all the queued messages outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); fireChannelInactiveAndDeregister(wasActive); ); ); else try // Close the channel and fail the queued messages in all cases. doClose0(promise);// 执行close操作 finally // 强行关闭outboundBuffer if (outboundBuffer != null) // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); if (inFlush0) invokeLater(new Runnable() @Override public void run() fireChannelInactiveAndDeregister(wasActive); ); else // 将channel不活跃的事件传播出去 fireChannelInactiveAndDeregister(wasActive);
1、关闭前的处理工作
1.首先判断是否需要阻塞,先将数据发送完成 2.doDeregister()
使得所有的selectorKey失效,防止关闭的时候注册事件到selector上 3.返回一个GlobalEventExecutor.INSTANCE
线程池执行任务
2、doClose0调用jdk的close 关闭channel
3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件
1.cancel所有的selectKey 2.pipeline.fireChannelInactive()
将close事件传播出去 3.pipeline也取消注册
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) if (!promise.setUncancellable()) return; if (!registered) safeSetSuccess(promise); return; // As a user may call deregister() from within any method while doing processing in the ChannelPipeline, // we need to ensure we do the actual deregister operation later. This is needed as for example, // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay, // the deregister operation this could lead to have a handler invoked by different EventLoop and so // threads. // // See: // https://github.com/netty/netty/issues/4435 invokeLater(new Runnable() @Override public void run() try doDeregister();// cancel所有的selectKey catch (Throwable t) logger.warn("Unexpected exception occurred while deregistering a channel.", t); finally if (fireChannelInactive) // 最后将这个关闭事件传播出去 pipeline.fireChannelInactive(); // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered, so check // if it was registered. if (registered) registered = false; // pipeline也取消注册 pipeline.fireChannelUnregistered(); safeSetSuccess(promise); );
4、doDeregister取消selector上的selectorKey
它本质上就是取消了selector上的所有的selectorKey,但是做出了一些优化
三、异常关闭
进入AbstractNioByteChannel
,其中的read方法为例,假设正在读取数据的时候突然抛出异常,会被handleReadException(pipeline, byteBuf, t, close, allocHandle);
处理,并关闭channel。
@Override public final void read() final ChannelConfig config = config(); if (shouldBreakReadReady(config)) clearReadPending(); return; final ChannelPipeline pipeline = pipeline(); // 内存分配器 final ByteBufAllocator allocator = config.getAllocator(); // 接收数据测handler final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try do // 分配内存是自适应的 byteBuf = allocHandle.allocate(allocator); // 开始读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) // 读取为-1,表示断开连接 // nothing was read. release the buffer. byteBuf.release();//释放byteBuf的空间 byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) // There is nothing left to read as we received an EOF. readPending = false; break; allocHandle.incMessagesRead(1);//读取一次数据 readPending = false; pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去 byteBuf = null; while (allocHandle.continueReading());//继续读取 // 通过当前 allocHandle.readComplete();//计算下一次的需要分配的空间 pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去 if (close) closeOnRead(pipeline);//关闭 catch (Throwable t) handleReadException(pipeline, byteBuf, t, close, allocHandle); finally // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) removeReadOp();
四、总结
以上是关于netty源码之断开连接的主要内容,如果未能解决你的问题,请参考以下文章