Netty源码分析-ChunkedFile和ChunkedWriteHandler
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-ChunkedFile和ChunkedWriteHandler相关的知识,希望对你有一定的参考价值。
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.netty.util.internal.ObjectUtil;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* A @link ChunkedInput that fetches data from a file chunk by chunk.
* <p>
* If your operating system supports
* <a href="http://en.wikipedia.org/wiki/Zero-copy">zero-copy file transfer</a>
* such as @code sendfile(), you might want to use @link FileRegion instead.
*/
public class ChunkedFile implements ChunkedInput<ByteBuf>
private final RandomAccessFile file;
private final long startOffset;
private final long endOffset;
private final int chunkSize;
private long offset;
/**
* Creates a new instance that fetches data from the specified file.
*/
public ChunkedFile(File file) throws IOException
this(file, ChunkedStream.DEFAULT_CHUNK_SIZE);
/**
* Creates a new instance that fetches data from the specified file.
*
* @param chunkSize the number of bytes to fetch on each
* @link #readChunk(ChannelHandlerContext) call
*/
//只读方式打开文件
public ChunkedFile(File file, int chunkSize) throws IOException
this(new RandomAccessFile(file, "r"), chunkSize);
/**
* Creates a new instance that fetches data from the specified file.
*/
public ChunkedFile(RandomAccessFile file) throws IOException
this(file, ChunkedStream.DEFAULT_CHUNK_SIZE);
/**
* Creates a new instance that fetches data from the specified file.
*
* @param chunkSize the number of bytes to fetch on each
* @link #readChunk(ChannelHandlerContext) call
*/
public ChunkedFile(RandomAccessFile file, int chunkSize) throws IOException
this(file, 0, file.length(), chunkSize);
/**
* Creates a new instance that fetches data from the specified file.
*
* @param offset the offset of the file where the transfer begins 文件读取起始位置偏移量
* @param length the number of bytes to transfer 一共读取多少字节
* @param chunkSize the number of bytes to fetch on each 每次循环调用readChunk读取的块大小
* @link #readChunk(ChannelHandlerContext) call
*/
public ChunkedFile(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException
ObjectUtil.checkNotNull(file, "file");
ObjectUtil.checkPositiveOrZero(offset, "offset");
ObjectUtil.checkPositiveOrZero(length, "length");
ObjectUtil.checkPositive(chunkSize, "chunkSize");
this.file = file; //文件
this.offset = startOffset = offset; //读取起始位置
this.endOffset = offset + length; //读取结束位置
this.chunkSize = chunkSize; //每次读取快大小
//设置file的偏移量
file.seek(offset);
/**
* Returns the offset in the file where the transfer began.
*/
public long startOffset()
return startOffset;
/**
* Returns the offset in the file where the transfer will end.
*/
public long endOffset()
return endOffset;
/**
* Returns the offset in the file where the transfer is happening currently.
*/
public long currentOffset()
return offset;
@Override
public boolean isEndOfInput() throws Exception
//是否读取完毕
//offset < endOffset = true 说明没读到最后
//file.getChannel().isOpen()=true,说明文件打开状态
//上诉俩种情况则任务没有读取完毕,二者出现任何一种情况都认为读取完毕
return !(offset < endOffset && file.getChannel().isOpen());
//关闭底层资源
@Override
public void close() throws Exception
file.close();
@Deprecated
@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception
//读取底层文件
return readChunk(ctx.alloc());
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception
//如果已经到达结束偏移量,不需要再读取数据了
long offset = this.offset;
if (offset >= endOffset)
return null;
//看看剩余的量与chunkSize块谁更小一些
//如果剩余的字节非常大,则每次按照chunkSize去读取,如果剩余的字节数量不够一个chunkSize了,就读实际的数量。
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
// Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy
//分配ByteBuf,注意是堆内存
ByteBuf buf = allocator.heapBuffer(chunkSize);
boolean release = true;
try
//读取固定长度chunkSize这个些个字节,如果字节不够会阻塞,如果返回EOF会抛出异常。
file.readFully(buf.array(), buf.arrayOffset(), chunkSize);
//重新设置ByteBuf的写索引
buf.writerIndex(chunkSize);
//偏移量增加chunkSize这么些字节.
this.offset = offset + chunkSize;
release = false;
return buf;
finally
if (release)
buf.release();
//返回长度
@Override
public long length()
return endOffset - startOffset;
//返回处理了多少字节,用当前的offset减去最开始的startOffset
@Override
public long progress()
return offset - startOffset;
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.stream;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A @link ChannelHandler that adds support for writing a large data stream
* asynchronously neither spending a lot of memory nor getting
* @link OutOfMemoryError. Large data streaming such as file
* transfer requires complicated state management in a @link ChannelHandler
* implementation. @link ChunkedWriteHandler manages such complicated states
* so that you can send a large data stream without difficulties.
* <p>
* To use @link ChunkedWriteHandler in your application, you have to insert
* a new @link ChunkedWriteHandler instance:
* <pre>
* @link ChannelPipeline p = ...;
* p.addLast("streamer", <b>new @link ChunkedWriteHandler()</b>);
* p.addLast("handler", new MyHandler());
* </pre>
* Once inserted, you can write a @link ChunkedInput so that the
* @link ChunkedWriteHandler can pick it up and fetch the content of the
* stream chunk by chunk and write the fetched chunk downstream:
* <pre>
* @link Channel ch = ...;
* ch.write(new @link ChunkedFile(new File("video.mkv"));
* </pre>
*
* <h3>Sending a stream which generates a chunk intermittently</h3>
*
* Some @link ChunkedInput generates a chunk on a certain event or timing.
* Such @link ChunkedInput implementation often returns @code null on
* @link ChunkedInput#readChunk(ChannelHandlerContext), resulting in the indefinitely suspended
* transfer. To resume the transfer when a new chunk is available, you have to
* call @link #resumeTransfer().
*/
public class ChunkedWriteHandler extends ChannelDuplexHandler
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
//一个队列,用来存储需要输出的数据
private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
//当前Context上下文
private volatile ChannelHandlerContext ctx;
public ChunkedWriteHandler()
/**
* @deprecated use @link #ChunkedWriteHandler()
*/
@Deprecated
public ChunkedWriteHandler(int maxPendingWrites)
if (maxPendingWrites <= 0)
throw new IllegalArgumentException(
"maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
this.ctx = ctx;
/**
* Continues to fetch the chunks from the input.
*/
public void resumeTransfer()
final ChannelHandlerContext ctx = this.ctx;
if (ctx == null)
return;
if (ctx.executor().inEventLoop())
resumeTransfer0(ctx);
else
// let the transfer resume on the next event loop round
ctx.executor().execute(new Runnable()
@Override
public void run()
resumeTransfer0(ctx);
);
private void resumeTransfer0(ChannelHandlerContext ctx)
try
doFlush(ctx);
catch (Exception e)
logger.warn("Unexpected exception while sending chunks.", e);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
//write方法没有向下传递,而是封装为PendingWrite对象,并存入队列。
queue.add(new PendingWrite(msg, promise));
@Override
public void flush(ChannelHandlerContext ctx) throws Exception
//flush方法没有向下传递,根据情况进行输出
doFlush(ctx);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
//连接断开事件中,根据情况进行输出
doFlush(ctx);
ctx.fireChannelInactive();
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
if (ctx.channel().isWritable())
// channel is writable again try to continue flushing
//如果当前channel再次切换到可写状态,则继续doFlush
doFlush(ctx);
ctx.fireChannelWritabilityChanged();
private void discard(Throwable cause)
for (;;)
//循环每一个等待输出的对象
PendingWrite currentWrite = queue.poll();
//如果为null说明处理完毕
if (currentWrite == null)
break;
//拿到msg对象
Object message = currentWrite.msg;
if (message instanceof ChunkedInput)
//块对象处理
ChunkedInput<?> in = (ChunkedInput<?>) message;
boolean endOfInput;
long inputLength;
try
//判断是否已经到达结尾
endOfInput = in.isEndOfInput();
//获取长度
inputLength = in.length();
//关闭资源
closeInput(in);
catch (Exception e)
//关闭资源
closeInput(in);
//设置错误
currentWrite.fail(e);
if (logger.isWarnEnabled())
logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
continue;
//如果块资源还没输出干净
if (!endOfInput)
if (cause == null)
cause = new ClosedChannelException();
//设置为失败状态
currentWrite.fail(cause);
else
//如果已经输出完毕,则设置为成功状态
currentWrite.success(inputLength);
else
//普通对象设置promise失败即可
if (cause == null)
cause = new ClosedChannelException();
currentWrite.fail(cause);
private void doFlush(final ChannelHandlerContext ctx)
final Channel channel = ctx.channel();
//首先要判断channel是否连接着
if (!channel.isActive())
//处理没有输出掉的对象
discard(null);
return;
//是否需要调用底层flush
boolean requiresFlush = true;
//内存分配器
ByteBufAllocator allocator = ctx.alloc();
//如果channel还可以写就继续循环,如果底层队列写满这里会返回false
while (channel.isWritable())
//拿出队列头部但不移除元素
final PendingWrite currentWrite = queue.peek();
//如果为null则结束循环
if (currentWrite == null)
break;
//如果当前元素已经完成,则从队列中移除后继续循环下一个
if (currentWrite.promise.isDone())
// This might happen e.g. in the case when a write operation
// failed, but there're still unconsumed chunks left.
// Most chunked input sources would stop generating chunks
// and report end of input, but this doesn't work with any
// source wrapped in HttpChunkedInput.
// Note, that we're not trying to release the message/chunks
// as this had to be done already by someone who resolved the
// promise (using ChunkedInput.close method).
// See https://github.com/netty/netty/issues/8700.
queue.remove();
continue;
//消息
final Object pendingMessage = currentWrite.msg;
//消息是块消息
if (pendingMessage instanceof ChunkedInput)
final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
boolean endOfInput;
boolean suspend;
Object message = null;
try
//读取一部分数据内容
message = chunks.readChunk(allocator);
//判断是否读取结束
endOfInput = chunks.isEndOfInput();
//如果消息等于null
if (message == null)
// No need to suspend when reached at the end.
//如果此时消息块读取结束则endOfInput=treu,那么suspend=false,表示不需要暂停
//否则说明消息快还没结束,但是本次获取数据暂时没有。
suspend = !endOfInput;
else
//如果suspend不为空,则不需要暂停
suspend = false;
catch (final Throwable t)
//异常处理,从队列中移除
queue.remove();
//释放msg
if (message != null)
ReferenceCountUtil.release(message);
//关闭资源
closeInput(chunks);
//设置失败
currentWrite.fail(t);
//结束循环
break;
//如果需要暂停,那么也结束本次循环
if (suspend)
// ChunkedInput.nextChunk() returned null and it has
// not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify.
break;
//message为null,给一个空的ByteBuf对象
if (message == null)
// If message is null write an empty ByteBuf.
// See https://github.com/netty/netty/issues/1671
message = Unpooled.EMPTY_BUFFER;
// Flush each chunk to conserve memory
//向底层输出message
ChannelFuture f = ctx.writeAndFlush(message);
if (endOfInput) //如果块数据已经结束
queue.remove(); //移除
//如果f已经输出完毕
if (f.isDone())
handleEndOfInputFuture(f, currentWrite);
else
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
//
// See https://github.com/netty/netty/issues/303
//如果没有立即完成,则注册回调事件
f.addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future)
handleEndOfInputFuture(future, currentWrite);
);
else //如果块数据没写完
//是否需要重新开始
final boolean resume = !channel.isWritable();
if (f.isDone()) //上一个块底层socket写入成功
handleFuture(f, currentWrite, resume);
else
//没有立即返回结果,注册事件
f.addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future)
handleFuture(future, currentWrite, resume);
);
requiresFlush = false;
else
//不是块消息,则从队列中移除
queue.remove();
//向下传递后把requiresFlush设置为true
ctx.write(pendingMessage, currentWrite.promise);
requiresFlush = true;
//如果channel关闭了,则处理关闭逻辑,释放资源。
if (!channel.isActive())
discard(new ClosedChannelException());
break;
//如果上面没有循环,这里需要向底层传递flush方法。
if (requiresFlush)
ctx.flush();
private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite)
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) //底层socket输出失败
closeInput(input); //关闭资源
currentWrite.fail(future.cause()); //设置失败
else
// read state of the input in local variables before closing it
long inputProgress = input.progress();
long inputLength = input.length();
closeInput(input); //关闭资源
currentWrite.progress(inputProgress, inputLength); //设置进度
currentWrite.success(inputLength); //设置成功
private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume)
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
if (!future.isSuccess()) //写入失败
closeInput(input); //关闭资源
currentWrite.fail(future.cause());//设置失败
else
//更新进度
currentWrite.progress(input.progress(), input.length());
//如果resume=false,说明当时channel还是可以写的,则顶层循环会继续
//不需要在这里触发继续写的逻辑
//否则此时再判断一下isWritable,如果可以写,则触发继续写的逻辑
if (resume && future.channel().isWritable())
resumeTransfer();
private static void closeInput(ChunkedInput<?> chunks)
try
chunks.close();
catch (Throwable t)
if (logger.isWarnEnabled())
logger.warn("Failed to close a chunked input.", t);
//封装原始消息
private static final class PendingWrite
final Object msg; //等待输出的消息对象
final ChannelPromise promise; //任务句柄
PendingWrite(Object msg, ChannelPromise promise)
this.msg = msg;
this.promise = promise;
//设置失败
void fail(Throwable cause)
ReferenceCountUtil.release(msg);
promise.tryFailure(cause);
//设置成功
void success(long total)
if (promise.isDone())
// No need to notify the progress or fulfill the promise because it's done already.
return;
//更新进度
progress(total, total);
promise.trySuccess();
//更新进度
void progress(long progress, long total)
if (promise instanceof ChannelProgressivePromise)
((ChannelProgressivePromise) promise).tryProgress(progress, total);
以上是关于Netty源码分析-ChunkedFile和ChunkedWriteHandler的主要内容,如果未能解决你的问题,请参考以下文章