netty源码之写数据

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之写数据相关的知识,希望对你有一定的参考价值。

目录

前言

和快递比较一下

几种场景

发送数据的分类

写数据

flush数据


前言

write 是把数据写到buf里

flush 是将数据发出去

writeAndFlush 写入数据到buf 并立刻发送出去

和快递比较一下

 

几种场景

1、netty协数据,写不进去了 , 会停止写 , 然后注册一个OP_WRITE事件 , 来通知什么时候可以写进去了再写

2、netty批量写数据时,如果想写的都写完了, 接下来会尝试写更多 , 调整maxBytesPerGatheringWrite

3、netty如果一直有数据要写 , 会一直尝试着写 , 直到写不出去或者满16次 (writeSpinCount)

4、等待写的数据太多,超过了阈值(writeBufferWaterMark.high()) ,将可写标志位改成false , 让应用端决定要不要继续写。

发送数据的分类

1.unflushedEntry所指向的entry,每次向链表中添加数据写到链表尾部

2.写好一份完整的数据以后就将unflushedEntry这个头结点变成flushedEntry

3.最后写入数据就是从flushedEntry开始遍历,写一个数据删除一个节点,继续写入下一个,正在发送数据的entry标记为progress

 

写数据

在我们读取数据的方法里,会写数据到buf

AbstractChannelHandlerContext.write方法

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

注意写数据是从tail -> head的流转 , 所以最终的逻辑在head里

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
​
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }
​
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
​
    outboundBuffer.addMessage(msg, size, promise);
}

ChannelOutboundBuffer.addMessage

public void addMessage(Object msg, int size, ChannelPromise promise) {
    //封装成一个entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        //放到队尾
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    // 判断是否还可以发送数据
    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

flush数据

刷新数据是将数据写出去给客户端,这个flush的触发点,是在业务逻辑的channelReadComplete方法里,这个方法是一次读事件完全处理完后才会触发的。

AbstractChannelHandlerContext.flush()

public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //执行刷新
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }
​
    return this;
}

DefaultChannelPipeline.HeadContext.flush

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

AbstractChannel.flush0()

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }
​
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
​
    inFlush0 = true;
​
    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }
​
    try {
        //写数据的核心
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}

// 开始写数据
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();// 尝试写16次数据。为了保证其他线程占用CPU执行任务,而不会一直处于阻塞状态
    do {
        // 判断数据是否写完
        if (in.isEmpty()) {
            // All written so clear OP_WRITE
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
​
        // Ensure the pending writes are made of ByteBufs only.
        int maxBytesPerGatheringWrite = ((NiosocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        // 将需要写的数据变成ByteBuffer写出去
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);// 1024表示数据个数,maxBytesPerGatheringWrite表示最大发送的数据的字节数
        int nioBufferCnt = in.nioBufferCount();
​
        // Always us nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        // 开始发送数据
        switch (nioBufferCnt) {
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // Only one ByteBuf so use non-gathering write
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);//数据个数为1时,直接使用了jdk的byteBuf写数据的方式
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // 尝试写的数据量和实际写入的数据量对比,调整每次写入的数据量
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);//buffer清空写出完成的数据的位置
                --writeSpinCount;//写的次数减少一次
                break;
            }
            default: {
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                // We limit the max amount to int above so cast is safe
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//批量写数据的做出了优化
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);
​
    // 判断上一次数据是否写出完成,没有完成就shchdule一个新的write的task
    incompleteWrite(writeSpinCount < 0);
}

以上是关于netty源码之写数据的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之写数据

linux源码解析10–缺页异常之写时复制

Netty-源码分析ByteBuf-slice和retainedSlice使用细节

大数据成神之路-Netty(源码解析篇)

Netty核心技术及源码剖析-Netty入站与出站机制

Day477&478.Netty核心源码 -netty