netty源码之写数据

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之写数据相关的知识,希望对你有一定的参考价值。

目录

前言

和快递比较一下

几种场景

发送数据的分类

写数据

flush数据


前言

write 是把数据写到buf里

flush 是将数据发出去

writeAndFlush 写入数据到buf 并立刻发送出去

和快递比较一下

 

几种场景

1、netty协数据,写不进去了 , 会停止写 , 然后注册一个OP_WRITE事件 , 来通知什么时候可以写进去了再写

2、netty批量写数据时,如果想写的都写完了, 接下来会尝试写更多 , 调整maxBytesPerGatheringWrite

3、netty如果一直有数据要写 , 会一直尝试着写 , 直到写不出去或者满16次 (writeSpinCount)

4、等待写的数据太多,超过了阈值(writeBufferWaterMark.high()) ,将可写标志位改成false , 让应用端决定要不要继续写。

发送数据的分类

1.unflushedEntry所指向的entry,每次向链表中添加数据写到链表尾部

2.写好一份完整的数据以后就将unflushedEntry这个头结点变成flushedEntry

3.最后写入数据就是从flushedEntry开始遍历,写一个数据删除一个节点,继续写入下一个,正在发送数据的entry标记为progress

 

写数据

在我们读取数据的方法里,会写数据到buf

AbstractChannelHandlerContext.write方法

private void write(Object msg, boolean flush, ChannelPromise promise) 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) 
        if (flush) 
            next.invokeWriteAndFlush(m, promise);
         else 
            next.invokeWrite(m, promise);
        
     else 
        AbstractWriteTask task;
        if (flush) 
            task = WriteAndFlushTask.newInstance(next, m, promise);
          else 
            task = WriteTask.newInstance(next, m, promise);
        
        safeExecute(executor, task, promise, m);
    

注意写数据是从tail -> head的流转 , 所以最终的逻辑在head里

public final void write(Object msg, ChannelPromise promise) 
    assertEventLoop();
​
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) 
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    
​
    int size;
    try 
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) 
            size = 0;
        
     catch (Throwable t) 
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    
​
    outboundBuffer.addMessage(msg, size, promise);

ChannelOutboundBuffer.addMessage

public void addMessage(Object msg, int size, ChannelPromise promise) 
    //封装成一个entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) 
        flushedEntry = null;
        tailEntry = entry;
     else 
        //放到队尾
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    
    if (unflushedEntry == null) 
        unflushedEntry = entry;
    
    // 判断是否还可以发送数据
    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);

flush数据

刷新数据是将数据写出去给客户端,这个flush的触发点,是在业务逻辑的channelReadComplete方法里,这个方法是一次读事件完全处理完后才会触发的。

AbstractChannelHandlerContext.flush()

public ChannelHandlerContext flush() 
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) 
        //执行刷新
        next.invokeFlush();
     else 
        Runnable task = next.invokeFlushTask;
        if (task == null) 
            next.invokeFlushTask = task = new Runnable() 
                @Override
                public void run() 
                    next.invokeFlush();
                
            ;
        
        safeExecute(executor, task, channel().voidPromise(), null);
    
​
    return this;

DefaultChannelPipeline.HeadContext.flush

public void flush(ChannelHandlerContext ctx) throws Exception 
    unsafe.flush();

AbstractChannel.flush0()

protected void flush0() 
    if (inFlush0) 
        // Avoid re-entrance
        return;
    
​
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) 
        return;
    
​
    inFlush0 = true;
​
    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) 
        try 
            if (isOpen()) 
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
             else 
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            
         finally 
            inFlush0 = false;
        
        return;
    
​
    try 
        //写数据的核心
        doWrite(outboundBuffer);
     catch (Throwable t) 
        if (t instanceof IOException && config().isAutoClose()) 
            /**
             * Just call @link #close(ChannelPromise, Throwable, boolean) here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise @link #isActive() , @link #isOpen() and @link #isWritable()
             * may still return @code true even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
         else 
            outboundBuffer.failFlushed(t, true);
        
     finally 
        inFlush0 = false;
    

// 开始写数据
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception 
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();// 尝试写16次数据。为了保证其他线程占用CPU执行任务,而不会一直处于阻塞状态
    do 
        // 判断数据是否写完
        if (in.isEmpty()) 
            // All written so clear OP_WRITE
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        
​
        // Ensure the pending writes are made of ByteBufs only.
        int maxBytesPerGatheringWrite = ((NiosocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        // 将需要写的数据变成ByteBuffer写出去
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);// 1024表示数据个数,maxBytesPerGatheringWrite表示最大发送的数据的字节数
        int nioBufferCnt = in.nioBufferCount();
​
        // Always us nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        // 开始发送数据
        switch (nioBufferCnt) 
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                writeSpinCount -= doWrite0(in);
                break;
            case 1: 
                // Only one ByteBuf so use non-gathering write
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);//数据个数为1时,直接使用了jdk的byteBuf写数据的方式
                if (localWrittenBytes <= 0) 
                    incompleteWrite(true);
                    return;
                
                // 尝试写的数据量和实际写入的数据量对比,调整每次写入的数据量
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);//buffer清空写出完成的数据的位置
                --writeSpinCount;//写的次数减少一次
                break;
            
            default: 
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                // We limit the max amount to int above so cast is safe
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//批量写数据的做出了优化
                if (localWrittenBytes <= 0) 
                    incompleteWrite(true);
                    return;
                
                // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            
        
     while (writeSpinCount > 0);
​
    // 判断上一次数据是否写出完成,没有完成就shchdule一个新的write的task
    incompleteWrite(writeSpinCount < 0);

以上是关于netty源码之写数据的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之写数据

linux源码解析10–缺页异常之写时复制

Netty源码:2 把握 Netty 整体架构脉络

大数据成神之路-Netty(源码解析篇)

Netty核心技术及源码剖析-Netty入站与出站机制

攻读netty源码,解码器