netty源码之写数据
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之写数据相关的知识,希望对你有一定的参考价值。
目录
前言
write 是把数据写到buf里
flush 是将数据发出去
writeAndFlush 写入数据到buf 并立刻发送出去
和快递比较一下
几种场景
1、netty协数据,写不进去了 , 会停止写 , 然后注册一个OP_WRITE事件 , 来通知什么时候可以写进去了再写
2、netty批量写数据时,如果想写的都写完了, 接下来会尝试写更多 , 调整maxBytesPerGatheringWrite
3、netty如果一直有数据要写 , 会一直尝试着写 , 直到写不出去或者满16次 (writeSpinCount)
4、等待写的数据太多,超过了阈值(writeBufferWaterMark.high()) ,将可写标志位改成false , 让应用端决定要不要继续写。
发送数据的分类
1.unflushedEntry所指向的entry,每次向链表中添加数据写到链表尾部
2.写好一份完整的数据以后就将unflushedEntry这个头结点变成flushedEntry
3.最后写入数据就是从flushedEntry开始遍历,写一个数据删除一个节点,继续写入下一个,正在发送数据的entry标记为progress
写数据
在我们读取数据的方法里,会写数据到buf
AbstractChannelHandlerContext.write方法
private void write(Object msg, boolean flush, ChannelPromise promise) AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) if (flush) next.invokeWriteAndFlush(m, promise); else next.invokeWrite(m, promise); else AbstractWriteTask task; if (flush) task = WriteAndFlushTask.newInstance(next, m, promise); else task = WriteTask.newInstance(next, m, promise); safeExecute(executor, task, promise, m);
注意写数据是从tail -> head的流转 , 所以最终的逻辑在head里
public final void write(Object msg, ChannelPromise promise) assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; int size; try msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) size = 0; catch (Throwable t) safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; outboundBuffer.addMessage(msg, size, promise);
ChannelOutboundBuffer.addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) //封装成一个entry Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) flushedEntry = null; tailEntry = entry; else //放到队尾 Entry tail = tailEntry; tail.next = entry; tailEntry = entry; if (unflushedEntry == null) unflushedEntry = entry; // 判断是否还可以发送数据 // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false);
flush数据
刷新数据是将数据写出去给客户端,这个flush的触发点,是在业务逻辑的channelReadComplete方法里,这个方法是一次读事件完全处理完后才会触发的。
AbstractChannelHandlerContext.flush()
public ChannelHandlerContext flush() final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) //执行刷新 next.invokeFlush(); else Runnable task = next.invokeFlushTask; if (task == null) next.invokeFlushTask = task = new Runnable() @Override public void run() next.invokeFlush(); ; safeExecute(executor, task, channel().voidPromise(), null); return this;
DefaultChannelPipeline.HeadContext.flush
public void flush(ChannelHandlerContext ctx) throws Exception unsafe.flush();
AbstractChannel.flush0()
protected void flush0() if (inFlush0) // Avoid re-entrance return; final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) return; inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) try if (isOpen()) outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); else // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); finally inFlush0 = false; return; try //写数据的核心 doWrite(outboundBuffer); catch (Throwable t) if (t instanceof IOException && config().isAutoClose()) /** * Just call @link #close(ChannelPromise, Throwable, boolean) here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise @link #isActive() , @link #isOpen() and @link #isWritable() * may still return @code true even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); else outboundBuffer.failFlushed(t, true); finally inFlush0 = false;
// 开始写数据 @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount();// 尝试写16次数据。为了保证其他线程占用CPU执行任务,而不会一直处于阻塞状态 do // 判断数据是否写完 if (in.isEmpty()) // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NiosocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 将需要写的数据变成ByteBuffer写出去 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);// 1024表示数据个数,maxBytesPerGatheringWrite表示最大发送的数据的字节数 int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 // 开始发送数据 switch (nioBufferCnt) case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer);//数据个数为1时,直接使用了jdk的byteBuf写数据的方式 if (localWrittenBytes <= 0) incompleteWrite(true); return; // 尝试写的数据量和实际写入的数据量对比,调整每次写入的数据量 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes);//buffer清空写出完成的数据的位置 --writeSpinCount;//写的次数减少一次 break; default: // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//批量写数据的做出了优化 if (localWrittenBytes <= 0) incompleteWrite(true); return; // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; while (writeSpinCount > 0); // 判断上一次数据是否写出完成,没有完成就shchdule一个新的write的task incompleteWrite(writeSpinCount < 0);
以上是关于netty源码之写数据的主要内容,如果未能解决你的问题,请参考以下文章