netty源码之断开连接
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之断开连接相关的知识,希望对你有一定的参考价值。
目录
2、doClose0调用jdk的close 关闭channel
3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件
4、doDeregister取消selector上的selectorKey
一、前言
关闭的逻辑是在read方法里 , 当接受的数据<0时,说明没有可接受的数据了,可以关闭了。
二、正常关闭
1.allocHandle.lastBytesRead() <= 0
这一行会发现数据小于零,释放byteBuf
2.进入到closeOnRead(pipeline);
完成关闭事件
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 内存分配器 final ByteBufAllocator allocator = config.getAllocator(); // 接收数据测handler final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 分配内存是自适应的 byteBuf = allocHandle.allocate(allocator); // 开始读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) {// 读取为-1,表示断开连接 // nothing was read. release the buffer. byteBuf.release();//释放byteBuf的空间 byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1);//读取一次数据 readPending = false; pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去 byteBuf = null; } while (allocHandle.continueReading());//继续读取 // 通过当前 allocHandle.readComplete();//计算下一次的需要分配的空间 pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去 if (close) { closeOnRead(pipeline);//关闭 } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
继续跟进核心的方法在AbstractChannel.close方法里
private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; }
if (closeInitiated) { if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. // This means close() was called before so we just register a listener and return closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.setSuccess(); } }); } return; } closeInitiated = true; final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // outboundBuffer置空 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose();// 关闭之前将数据发送完成之前的一些操作 if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { // Fail all the queued messages outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { // Close the channel and fail the queued messages in all cases. doClose0(promise);// 执行close操作 } finally { // 强行关闭outboundBuffer if (outboundBuffer != null) { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } } if (inFlush0) { invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive); } }); } else { // 将channel不活跃的事件传播出去 fireChannelInactiveAndDeregister(wasActive); } } }
1、关闭前的处理工作
1.首先判断是否需要阻塞,先将数据发送完成 2.doDeregister()
使得所有的selectorKey失效,防止关闭的时候注册事件到selector上 3.返回一个GlobalEventExecutor.INSTANCE
线程池执行任务
2、doClose0调用jdk的close 关闭channel
3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件
1.cancel所有的selectKey 2.pipeline.fireChannelInactive()
将close事件传播出去 3.pipeline也取消注册
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } // As a user may call deregister() from within any method while doing processing in the ChannelPipeline, // we need to ensure we do the actual deregister operation later. This is needed as for example, // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay, // the deregister operation this could lead to have a handler invoked by different EventLoop and so // threads. // // See: // https://github.com/netty/netty/issues/4435 invokeLater(new Runnable() { @Override public void run() { try { doDeregister();// cancel所有的selectKey } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (fireChannelInactive) { // 最后将这个关闭事件传播出去 pipeline.fireChannelInactive(); } // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered, so check // if it was registered. if (registered) { registered = false; // pipeline也取消注册 pipeline.fireChannelUnregistered(); } safeSetSuccess(promise); } } }); }
4、doDeregister取消selector上的selectorKey
它本质上就是取消了selector上的所有的selectorKey,但是做出了一些优化
三、异常关闭
进入AbstractNioByteChannel
,其中的read方法为例,假设正在读取数据的时候突然抛出异常,会被handleReadException(pipeline, byteBuf, t, close, allocHandle);
处理,并关闭channel。
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 内存分配器 final ByteBufAllocator allocator = config.getAllocator(); // 接收数据测handler final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 分配内存是自适应的 byteBuf = allocHandle.allocate(allocator); // 开始读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) {// 读取为-1,表示断开连接 // nothing was read. release the buffer. byteBuf.release();//释放byteBuf的空间 byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1);//读取一次数据 readPending = false; pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去 byteBuf = null; } while (allocHandle.continueReading());//继续读取 // 通过当前 allocHandle.readComplete();//计算下一次的需要分配的空间 pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去 if (close) { closeOnRead(pipeline);//关闭 } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
四、总结
以上是关于netty源码之断开连接的主要内容,如果未能解决你的问题,请参考以下文章