Netty源码分析-ChunkedFile和ChunkedWriteHandler

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-ChunkedFile和ChunkedWriteHandler相关的知识,希望对你有一定的参考价值。

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.util.internal.ObjectUtil;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

/**
 * A @link ChunkedInput that fetches data from a file chunk by chunk.
 * <p>
 * If your operating system supports
 * <a href="http://en.wikipedia.org/wiki/Zero-copy">zero-copy file transfer</a>
 * such as @code sendfile(), you might want to use @link FileRegion instead.
 */
public class ChunkedFile implements ChunkedInput<ByteBuf> 

    private final RandomAccessFile file;
    private final long startOffset;
    private final long endOffset;
    private final int chunkSize;
    private long offset;

    /**
     * Creates a new instance that fetches data from the specified file.
     */
    public ChunkedFile(File file) throws IOException 
        this(file, ChunkedStream.DEFAULT_CHUNK_SIZE);
    

    /**
     * Creates a new instance that fetches data from the specified file.
     *
     * @param chunkSize the number of bytes to fetch on each
     *                  @link #readChunk(ChannelHandlerContext) call
     */
    //只读方式打开文件
    public ChunkedFile(File file, int chunkSize) throws IOException 
        this(new RandomAccessFile(file, "r"), chunkSize);
    

    /**
     * Creates a new instance that fetches data from the specified file.
     */
    public ChunkedFile(RandomAccessFile file) throws IOException 
        this(file, ChunkedStream.DEFAULT_CHUNK_SIZE);
    

    /**
     * Creates a new instance that fetches data from the specified file.
     *
     * @param chunkSize the number of bytes to fetch on each
     *                  @link #readChunk(ChannelHandlerContext) call
     */
    public ChunkedFile(RandomAccessFile file, int chunkSize) throws IOException 
        this(file, 0, file.length(), chunkSize);
    

    /**
     * Creates a new instance that fetches data from the specified file.
     *
     * @param offset the offset of the file where the transfer begins 文件读取起始位置偏移量
     * @param length the number of bytes to transfer   一共读取多少字节
     * @param chunkSize the number of bytes to fetch on each  每次循环调用readChunk读取的块大小
     *                  @link #readChunk(ChannelHandlerContext) call
     */
    public ChunkedFile(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException 
        ObjectUtil.checkNotNull(file, "file");
        ObjectUtil.checkPositiveOrZero(offset, "offset");
        ObjectUtil.checkPositiveOrZero(length, "length");
        ObjectUtil.checkPositive(chunkSize, "chunkSize");

        this.file = file; //文件
        this.offset = startOffset = offset; //读取起始位置
        this.endOffset = offset + length; //读取结束位置
        this.chunkSize = chunkSize; //每次读取快大小

        //设置file的偏移量
        file.seek(offset);
    

    /**
     * Returns the offset in the file where the transfer began.
     */
    public long startOffset() 
        return startOffset;
    

    /**
     * Returns the offset in the file where the transfer will end.
     */
    public long endOffset() 
        return endOffset;
    

    /**
     * Returns the offset in the file where the transfer is happening currently.
     */
    public long currentOffset() 
        return offset;
    

    @Override
    public boolean isEndOfInput() throws Exception 
        //是否读取完毕
        //offset < endOffset = true 说明没读到最后
        //file.getChannel().isOpen()=true,说明文件打开状态
        //上诉俩种情况则任务没有读取完毕,二者出现任何一种情况都认为读取完毕
        return !(offset < endOffset && file.getChannel().isOpen());
    

    //关闭底层资源
    @Override
    public void close() throws Exception 
        file.close();
    

    @Deprecated
    @Override
    public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception 
        //读取底层文件
        return readChunk(ctx.alloc());
    

    @Override
    public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception 
        //如果已经到达结束偏移量,不需要再读取数据了
        long offset = this.offset;
        if (offset >= endOffset) 
            return null;
        

        //看看剩余的量与chunkSize块谁更小一些
        //如果剩余的字节非常大,则每次按照chunkSize去读取,如果剩余的字节数量不够一个chunkSize了,就读实际的数量。
        int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
        // Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy

        //分配ByteBuf,注意是堆内存
        ByteBuf buf = allocator.heapBuffer(chunkSize);
        boolean release = true;
        try 
            //读取固定长度chunkSize这个些个字节,如果字节不够会阻塞,如果返回EOF会抛出异常。
            file.readFully(buf.array(), buf.arrayOffset(), chunkSize);
            //重新设置ByteBuf的写索引
            buf.writerIndex(chunkSize);
            //偏移量增加chunkSize这么些字节.
            this.offset = offset + chunkSize;
            release = false;
            return buf;
         finally 
            if (release) 
                buf.release();
            
        
    

    //返回长度
    @Override
    public long length() 
        return endOffset - startOffset;
    

    //返回处理了多少字节,用当前的offset减去最开始的startOffset
    @Override
    public long progress() 
        return offset - startOffset;
    

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.stream;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;

/**
 * A @link ChannelHandler that adds support for writing a large data stream
 * asynchronously neither spending a lot of memory nor getting
 * @link OutOfMemoryError.  Large data streaming such as file
 * transfer requires complicated state management in a @link ChannelHandler
 * implementation.  @link ChunkedWriteHandler manages such complicated states
 * so that you can send a large data stream without difficulties.
 * <p>
 * To use @link ChunkedWriteHandler in your application, you have to insert
 * a new @link ChunkedWriteHandler instance:
 * <pre>
 * @link ChannelPipeline p = ...;
 * p.addLast("streamer", <b>new @link ChunkedWriteHandler()</b>);
 * p.addLast("handler", new MyHandler());
 * </pre>
 * Once inserted, you can write a @link ChunkedInput so that the
 * @link ChunkedWriteHandler can pick it up and fetch the content of the
 * stream chunk by chunk and write the fetched chunk downstream:
 * <pre>
 * @link Channel ch = ...;
 * ch.write(new @link ChunkedFile(new File("video.mkv"));
 * </pre>
 *
 * <h3>Sending a stream which generates a chunk intermittently</h3>
 *
 * Some @link ChunkedInput generates a chunk on a certain event or timing.
 * Such @link ChunkedInput implementation often returns @code null on
 * @link ChunkedInput#readChunk(ChannelHandlerContext), resulting in the indefinitely suspended
 * transfer.  To resume the transfer when a new chunk is available, you have to
 * call @link #resumeTransfer().
 */
public class ChunkedWriteHandler extends ChannelDuplexHandler 

    private static final InternalLogger logger =
        InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);

    //一个队列,用来存储需要输出的数据
    private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();

    //当前Context上下文
    private volatile ChannelHandlerContext ctx;

    public ChunkedWriteHandler() 
    

    /**
     * @deprecated use @link #ChunkedWriteHandler()
     */
    @Deprecated
    public ChunkedWriteHandler(int maxPendingWrites) 
        if (maxPendingWrites <= 0) 
            throw new IllegalArgumentException(
                    "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
        
    

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception 
        this.ctx = ctx;
    

    /**
     * Continues to fetch the chunks from the input.
     */
    public void resumeTransfer() 
        final ChannelHandlerContext ctx = this.ctx;
        if (ctx == null) 
            return;
        
        if (ctx.executor().inEventLoop()) 
            resumeTransfer0(ctx);
         else 
            // let the transfer resume on the next event loop round
            ctx.executor().execute(new Runnable() 

                @Override
                public void run() 
                    resumeTransfer0(ctx);
                
            );
        
    

    private void resumeTransfer0(ChannelHandlerContext ctx) 
        try 
            doFlush(ctx);
         catch (Exception e) 
            logger.warn("Unexpected exception while sending chunks.", e);
        
    

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
        //write方法没有向下传递,而是封装为PendingWrite对象,并存入队列。
        queue.add(new PendingWrite(msg, promise));
    


    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception 
        //flush方法没有向下传递,根据情况进行输出
        doFlush(ctx);
    

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 
        //连接断开事件中,根据情况进行输出
        doFlush(ctx);
        ctx.fireChannelInactive();
    

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception 
        if (ctx.channel().isWritable()) 
            // channel is writable again try to continue flushing
            //如果当前channel再次切换到可写状态,则继续doFlush
            doFlush(ctx);
        
        ctx.fireChannelWritabilityChanged();
    

    private void discard(Throwable cause) 
        for (;;) 
            //循环每一个等待输出的对象
            PendingWrite currentWrite = queue.poll();

            //如果为null说明处理完毕
            if (currentWrite == null) 
                break;
            

            //拿到msg对象
            Object message = currentWrite.msg;

            if (message instanceof ChunkedInput) 
                //块对象处理
                ChunkedInput<?> in = (ChunkedInput<?>) message;
                boolean endOfInput;
                long inputLength;
                try 
                    //判断是否已经到达结尾
                    endOfInput = in.isEndOfInput();
                    //获取长度
                    inputLength = in.length();
                    //关闭资源
                    closeInput(in);
                 catch (Exception e) 
                    //关闭资源
                    closeInput(in);
                    //设置错误
                    currentWrite.fail(e);
                    if (logger.isWarnEnabled()) 
                        logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
                    
                    continue;
                

                //如果块资源还没输出干净
                if (!endOfInput) 
                    if (cause == null) 
                        cause = new ClosedChannelException();
                    
                    //设置为失败状态
                    currentWrite.fail(cause);
                 else 
                    //如果已经输出完毕,则设置为成功状态
                    currentWrite.success(inputLength);
                
             else 
                //普通对象设置promise失败即可
                if (cause == null) 
                    cause = new ClosedChannelException();
                
                currentWrite.fail(cause);
            
        
    

    private void doFlush(final ChannelHandlerContext ctx) 
        final Channel channel = ctx.channel();
        //首先要判断channel是否连接着
        if (!channel.isActive()) 
            //处理没有输出掉的对象
            discard(null);
            return;
        

        //是否需要调用底层flush
        boolean requiresFlush = true;

        //内存分配器
        ByteBufAllocator allocator = ctx.alloc();

        //如果channel还可以写就继续循环,如果底层队列写满这里会返回false
        while (channel.isWritable()) 
            //拿出队列头部但不移除元素
            final PendingWrite currentWrite = queue.peek();

            //如果为null则结束循环
            if (currentWrite == null) 
                break;
            

            //如果当前元素已经完成,则从队列中移除后继续循环下一个
            if (currentWrite.promise.isDone()) 
                // This might happen e.g. in the case when a write operation
                // failed, but there're still unconsumed chunks left.
                // Most chunked input sources would stop generating chunks
                // and report end of input, but this doesn't work with any
                // source wrapped in HttpChunkedInput.
                // Note, that we're not trying to release the message/chunks
                // as this had to be done already by someone who resolved the
                // promise (using ChunkedInput.close method).
                // See https://github.com/netty/netty/issues/8700.
                queue.remove();
                continue;
            

            //消息
            final Object pendingMessage = currentWrite.msg;

            //消息是块消息
            if (pendingMessage instanceof ChunkedInput) 
                final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
                boolean endOfInput;
                boolean suspend;
                Object message = null;
                try 
                    //读取一部分数据内容
                    message = chunks.readChunk(allocator);
                    //判断是否读取结束
                    endOfInput = chunks.isEndOfInput();

                    //如果消息等于null
                    if (message == null) 
                        // No need to suspend when reached at the end.
                        //如果此时消息块读取结束则endOfInput=treu,那么suspend=false,表示不需要暂停
                        //否则说明消息快还没结束,但是本次获取数据暂时没有。
                        suspend = !endOfInput;
                     else 
                        //如果suspend不为空,则不需要暂停
                        suspend = false;
                    
                 catch (final Throwable t) 
                    //异常处理,从队列中移除
                    queue.remove();

                    //释放msg
                    if (message != null) 
                        ReferenceCountUtil.release(message);
                    

                    //关闭资源
                    closeInput(chunks);
                    //设置失败
                    currentWrite.fail(t);
                    //结束循环
                    break;
                

                //如果需要暂停,那么也结束本次循环
                if (suspend) 
                    // ChunkedInput.nextChunk() returned null and it has
                    // not reached at the end of input. Let's wait until
                    // more chunks arrive. Nothing to write or notify.
                    break;
                

                //message为null,给一个空的ByteBuf对象
                if (message == null) 
                    // If message is null write an empty ByteBuf.
                    // See https://github.com/netty/netty/issues/1671
                    message = Unpooled.EMPTY_BUFFER;
                

                // Flush each chunk to conserve memory
                //向底层输出message
                ChannelFuture f = ctx.writeAndFlush(message);

                if (endOfInput) //如果块数据已经结束
                    queue.remove(); //移除

                    //如果f已经输出完毕
                    if (f.isDone()) 
                        handleEndOfInputFuture(f, currentWrite);
                     else 
                        // Register a listener which will close the input once the write is complete.
                        // This is needed because the Chunk may have some resource bound that can not
                        // be closed before its not written.
                        //
                        // See https://github.com/netty/netty/issues/303
                        //如果没有立即完成,则注册回调事件
                        f.addListener(new ChannelFutureListener() 
                            @Override
                            public void operationComplete(ChannelFuture future) 
                                handleEndOfInputFuture(future, currentWrite);
                            
                        );
                    
                 else  //如果块数据没写完
                    //是否需要重新开始
                    final boolean resume = !channel.isWritable();
                    if (f.isDone())  //上一个块底层socket写入成功
                        handleFuture(f, currentWrite, resume);
                     else 
                        //没有立即返回结果,注册事件
                        f.addListener(new ChannelFutureListener() 
                            @Override
                            public void operationComplete(ChannelFuture future) 
                                handleFuture(future, currentWrite, resume);
                            
                        );
                    
                
                requiresFlush = false;
             else 
                //不是块消息,则从队列中移除
                queue.remove();
                //向下传递后把requiresFlush设置为true
                ctx.write(pendingMessage, currentWrite.promise);
                requiresFlush = true;
            

            //如果channel关闭了,则处理关闭逻辑,释放资源。
            if (!channel.isActive()) 
                discard(new ClosedChannelException());
                break;
            
        

        //如果上面没有循环,这里需要向底层传递flush方法。
        if (requiresFlush) 
            ctx.flush();
        
    

    private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) 
        ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
        if (!future.isSuccess())  //底层socket输出失败
            closeInput(input); //关闭资源
            currentWrite.fail(future.cause()); //设置失败
         else 
            // read state of the input in local variables before closing it
            long inputProgress = input.progress();
            long inputLength = input.length();
            closeInput(input); //关闭资源
            currentWrite.progress(inputProgress, inputLength); //设置进度
            currentWrite.success(inputLength); //设置成功
        
    

    private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) 
        ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
        if (!future.isSuccess())  //写入失败
            closeInput(input); //关闭资源
            currentWrite.fail(future.cause());//设置失败
         else 
            //更新进度
            currentWrite.progress(input.progress(), input.length());

            //如果resume=false,说明当时channel还是可以写的,则顶层循环会继续
            //不需要在这里触发继续写的逻辑
            //否则此时再判断一下isWritable,如果可以写,则触发继续写的逻辑
            if (resume && future.channel().isWritable()) 
                resumeTransfer();
            
        
    

    private static void closeInput(ChunkedInput<?> chunks) 
        try 
            chunks.close();
         catch (Throwable t) 
            if (logger.isWarnEnabled()) 
                logger.warn("Failed to close a chunked input.", t);
            
        
    

    //封装原始消息
    private static final class PendingWrite 
        final Object msg; //等待输出的消息对象
        final ChannelPromise promise; //任务句柄

        PendingWrite(Object msg, ChannelPromise promise) 
            this.msg = msg;
            this.promise = promise;
        

        //设置失败
        void fail(Throwable cause) 
            ReferenceCountUtil.release(msg);
            promise.tryFailure(cause);
        

        //设置成功
        void success(long total) 
            if (promise.isDone()) 
                // No need to notify the progress or fulfill the promise because it's done already.
                return;
            
            //更新进度
            progress(total, total);
            promise.trySuccess();
        

        //更新进度
        void progress(long progress, long total) 
            if (promise instanceof ChannelProgressivePromise) 
                ((ChannelProgressivePromise) promise).tryProgress(progress, total);
            
        
    

 

以上是关于Netty源码分析-ChunkedFile和ChunkedWriteHandler的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码分析(七) PoolChunk

netty里的ByteBuf扩容源码分析

Netty源码分析:read

Netty源码分析:read

Netty源码分析:PoolChunk

Netty源码分析 ----- write过程 源码分析