netty源码之写数据
Posted better_hui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之写数据相关的知识,希望对你有一定的参考价值。
目录
前言
write 是把数据写到buf里
flush 是将数据发出去
writeAndFlush 写入数据到buf 并立刻发送出去
和快递比较一下
几种场景
1、netty协数据,写不进去了 , 会停止写 , 然后注册一个OP_WRITE事件 , 来通知什么时候可以写进去了再写
2、netty批量写数据时,如果想写的都写完了, 接下来会尝试写更多 , 调整maxBytesPerGatheringWrite
3、netty如果一直有数据要写 , 会一直尝试着写 , 直到写不出去或者满16次 (writeSpinCount)
4、等待写的数据太多,超过了阈值(writeBufferWaterMark.high()) ,将可写标志位改成false , 让应用端决定要不要继续写。
发送数据的分类
1.unflushedEntry所指向的entry,每次向链表中添加数据写到链表尾部
2.写好一份完整的数据以后就将unflushedEntry这个头结点变成flushedEntry
3.最后写入数据就是从flushedEntry开始遍历,写一个数据删除一个节点,继续写入下一个,正在发送数据的entry标记为progress
写数据
在我们读取数据的方法里,会写数据到buf
AbstractChannelHandlerContext.write方法
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
注意写数据是从tail -> head的流转 , 所以最终的逻辑在head里
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
ChannelOutboundBuffer.addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) { //封装成一个entry Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { //放到队尾 Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // 判断是否还可以发送数据 // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false); }
flush数据
刷新数据是将数据写出去给客户端,这个flush的触发点,是在业务逻辑的channelReadComplete方法里,这个方法是一次读事件完全处理完后才会触发的。
AbstractChannelHandlerContext.flush()
public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //执行刷新 next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel().voidPromise(), null); } return this; }
DefaultChannelPipeline.HeadContext.flush
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
AbstractChannel.flush0()
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { //写数据的核心 doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
// 开始写数据 @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount();// 尝试写16次数据。为了保证其他线程占用CPU执行任务,而不会一直处于阻塞状态 do { // 判断数据是否写完 if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NiosocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 将需要写的数据变成ByteBuffer写出去 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);// 1024表示数据个数,maxBytesPerGatheringWrite表示最大发送的数据的字节数 int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 // 开始发送数据 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer);//数据个数为1时,直接使用了jdk的byteBuf写数据的方式 if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // 尝试写的数据量和实际写入的数据量对比,调整每次写入的数据量 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes);//buffer清空写出完成的数据的位置 --writeSpinCount;//写的次数减少一次 break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//批量写数据的做出了优化 if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); // 判断上一次数据是否写出完成,没有完成就shchdule一个新的write的task incompleteWrite(writeSpinCount < 0); }
以上是关于netty源码之写数据的主要内容,如果未能解决你的问题,请参考以下文章