Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏
Posted
技术标签:
【中文标题】Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏【英文标题】:Java AsyncHttpClient: broken file while writing from LazyResponseBodyPart to AsynchronousFileChannel 【发布时间】:2019-10-17 06:16:54 【问题描述】:我将AsyncHttpClient library 用于异步非阻塞请求。 我的情况:通过网络接收数据时将数据写入文件。
对于从远程主机下载文件并保存到文件,我使用默认的ResponseBodyPartFactory.EAGER
和AsynchronousFileChannel
,以免在数据到达时阻塞 netty 线程。但正如我的测量结果所示,与LAZY
相比,Java 堆中的内存消耗增加了很多倍。
所以我决定直接去LAZY
,但没有考虑文件的后果。
此代码将有助于重现问题。:
public static class AsyncChannelWriter
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
public CompletableFuture<Integer> getStartPosition()
return startPosition;
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition)
return currentPosition.thenCompose(position ->
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>()
@Override
public void completed(Integer result, ByteBuffer attachment)
writenBytes.complete(result);
@Override
public void failed(Throwable exc, ByteBuffer attachment)
writenBytes.completeExceptionally(exc);
);
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
);
public void close(CompletableFuture<Integer> currentPosition)
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
public static void main(String[] args) throws IOException
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>()
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
@Override
public Response onCompleted(Response response)
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
);
在这种情况下,图片已损坏。但是如果我使用FileChannel
而不是AsynchronousFileChannel
,在这两种情况下,文件都会正常显示。使用DirectByteBuffer
(如果使用LazyResponseBodyPart.getBodyByteBuffer()
)和AsynchronousFileChannel
时是否有任何细微差别?
如果EAGER
一切正常,我的代码可能有什么问题?
更新
我注意到,如果我使用LAZY
,例如,我添加了行
Thread.sleep (10)
在方法onBodyPartReceived
中,像这样:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
文件以非损坏状态保存到磁盘。
据我了解,原因是在这 10 毫秒内,AsynchronousFileChannel
中的异步线程设法从这个DirectByteBuffer
向磁盘写入数据。
结果文件被破坏是因为这个异步线程使用这个缓冲区与netty线程一起写入。
如果我们用EagerResponseBodyPart
查看源代码,那么我们将看到以下内容
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last)
super(last);
bytes = byteBuf2Bytes(buf);
@Override
public ByteBuffer getBodyByteBuffer()
return ByteBuffer.wrap(bytes);
因此,当一条数据到达时,它立即存储在字节数组中。然后我们可以安全地将它们包装在 HeapByteBuffer 中,并在文件通道中传输到异步线程。
但是如果你看代码LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last)
super(last);
this.buf = buf;
@Override
public ByteBuffer getBodyByteBuffer()
return buf.nioBuffer();
如你所见,我们实际上通过方法调用nioBuffer
在异步文件通道线程nettyByteBuff
(在这种情况下总是PooledSlicedByteBuf
)使用@
在这种情况下我该怎么办,如何在异步线程中安全地传递 DirectByteBuffer
而无需将缓冲区复制到 Java 堆?
【问题讨论】:
为什么不使用BodyDeferringAsyncHandler
让生活更简单?
@MạnhQuyếtNguyễn 因为它无效?我使用这个客户端来减少内存消耗和 CPU 资源。对于简单的生活,我可以使用 apache 同步客户端。顺便说一句,BodyDeferringAsyncHandler
在内存消耗方面与我使用EAGER
的示例没有什么不同,因为BodyDeferringAsyncHandler
使用getBodyPartBytes
方法。我不确定,但可能在使用BodyDeferringAsyncHandler
时,线程会在写入OutputStream
时阻塞。
仅供参考:调用client.prepareGet(downloadUrl).execute
的线程未被阻塞。保持简单
@MạnhQuyếtNguyễn 当然可以,但是处理数据的线程会被阻塞。
总有一个线程被阻塞:真正写数据的那个
【参考方案1】:
我与AsyncHttpClient
的维护者进行了交谈。
Can see here
主要问题是我不使用 netty ByteBuf 方法 retain
和 release
。
最后,我想到了两个解决方案。
首先:将字节顺序写入到位置为CompletableFuture
的文件中。
为AsynchronousFileChannel
定义包装类
@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable
private final AtomicReference<CompletableFuture<Long>> positionReference;
private final AsynchronousFileChannel channel;
public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel)
this.channel = channel;
try
this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
catch (IOException e)
throw new UncheckedIOException(e);
public CompletableFuture<Long> write(ByteBuf byteBuffer)
final ByteBuf byteBuf = byteBuffer.retain();
return positionReference.updateAndGet(x -> x.thenCompose(position ->
final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>()
@Override
public void completed(Integer result, ByteBuf attachment)
attachment.release();
writenBytes.complete(result);
@Override
public void failed(Throwable exc, ByteBuf attachment)
attachment.release();
log.error(exc);
writenBytes.completeExceptionally(exc);
);
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
));
public void close()
positionReference.updateAndGet(x -> x.whenComplete((position, throwable) ->
try
channel.close();
catch (IOException e)
throw new UncheckedIOException(e);
));
事实上,这里可能不会有AtomicReference
,如果录制发生在一个线程中,如果是多个线程,那么我们需要认真对待同步。
及主要用途。
public static void main(String[] args) throws IOException
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>()
@Override
public State onBodyPartReceived(HttpResponseBodyPart content)
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
asyncChannelNettyByteBufWriter.write(byteBuf);
return State.CONTINUE;
@Override
public Response onCompleted(Response response)
asyncChannelNettyByteBufWriter.close();
return response;
);
第二种解决方案:根据接收到的字节大小跟踪位置。
public static void main(String[] args) throws IOException
final String filepath = "1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>()
private long position = 0;
@Override
public State onBodyPartReceived(HttpResponseBodyPart content)
final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
long currentPosition = position;
position+=byteBuf.readableBytes();
channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>()
@Override
public void completed(Integer result, ByteBuf attachment)
attachment.release();
if(content.isLast())
try
channel.close();
catch (IOException e)
throw new UncheckedIOException(e);
@Override
public void failed(Throwable exc, ByteBuf attachment)
attachment.release();
try
channel.close();
catch (IOException e)
throw new UncheckedIOException(e);
);
return State.CONTINUE;
@Override
public Response onCompleted(Response response)
return response;
);
在第二种解决方案中,因为我们不等到一些字节写入文件,AsynchronousFileChannel
可以创建很多线程(如果你使用 Linux,因为 Linux 没有实现非阻塞异步文件 IO。在 Windows 中,情况要好得多)。
正如我的测量结果所示,在写入慢速 USB 闪存的情况下,线程数可能达到数万,因此您需要通过创建 ExecutorService
并将其传输到AsynchronousFileChannel
.
第一种方案和第二种方案有明显的优缺点吗?我很难说。也许有人能说出什么更有效。
【讨论】:
您建议的第一个解决方案不会也遇到相同的线程问题吗?因为它也在使用AsynchronousFileChannel
。
@shays10 会,但明显更少。因为在第一种解决方案中,字节是按顺序写入文件的,直到写入前一个字节才会写入新的部分字节。在第二种情况下,我们不等到前一部分字节写入文件,所以在慢速记录的情况下,我们创建了很多线程。您可以通过在慢速存储(例如便宜的闪存驱动器)上模拟录制来检查这一点,并检查创建了多少线程。以上是关于Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏的主要内容,如果未能解决你的问题,请参考以下文章
android AsyncHttpClient 的作用好处?
安卓的异步下载(Asynchttpclient以及Volley)