Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏

Posted

技术标签:

【中文标题】Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏【英文标题】:Java AsyncHttpClient: broken file while writing from LazyResponseBodyPart to AsynchronousFileChannel 【发布时间】:2019-10-17 06:16:54 【问题描述】:

我将AsyncHttpClient library 用于异步非阻塞请求。 我的情况:通过网络接收数据时将数据写入文件。

对于从远程主机下载文件并保存到文件,我使用默认的ResponseBodyPartFactory.EAGERAsynchronousFileChannel,以免在数据到达时阻塞 netty 线程。但正如我的测量结果所示,与LAZY 相比,Java 堆中的内存消耗增加了很多倍。

所以我决定直接去LAZY,但没有考虑文件的后果。

此代码将有助于重现问题。:

public static class AsyncChannelWriter 
     private final CompletableFuture<Integer> startPosition;
     private final AsynchronousFileChannel channel;

     public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException 
         this.channel = channel;
         this.startPosition = CompletableFuture.completedFuture((int) channel.size());
     

     public CompletableFuture<Integer> getStartPosition() 
         return startPosition;
     

     public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) 

         return currentPosition.thenCompose(position -> 
             CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
             channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() 
                 @Override
                 public void completed(Integer result, ByteBuffer attachment) 
                     writenBytes.complete(result);
                 

                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) 
                     writenBytes.completeExceptionally(exc);
                 
             );
             return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
         );
     

     public void close(CompletableFuture<Integer> currentPosition) 
         currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
     
 

 public static void main(String[] args) throws IOException 
     final String filepath = "/media/veracrypt4/files/1.jpg";
     final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";

     final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
             .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
     final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
     final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
     final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
     client.prepareGet(downloadUrl)
             .execute(new AsyncCompletionHandler<Response>() 

                 @Override
                 public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception 
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
                     final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
                     final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
                     final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
                     atomicReferencePosition.set(newPosition);
                     return State.CONTINUE;
                 

                 @Override
                 public Response onCompleted(Response response) 
                     asyncChannelWriter.close(atomicReferencePosition.get());
                     return response;
                 
             );

在这种情况下,图片已损坏。但是如果我使用FileChannel 而不是AsynchronousFileChannel,在这两种情况下,文件都会正常显示。使用DirectByteBuffer(如果使用LazyResponseBodyPart.getBodyByteBuffer())和AsynchronousFileChannel 时是否有任何细微差别?

如果EAGER 一切正常,我的代码可能有什么问题?


更新

我注意到,如果我使用LAZY,例如,我添加了行 Thread.sleep (10)在方法onBodyPartReceived中,像这样:

 @Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception 
    final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
    final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
    final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
    atomicReferencePosition.set(newPosition);
    Thread.sleep(10);
    return State.CONTINUE;

文件以非损坏状态保存到磁盘。

据我了解,原因是在这 10 毫秒内,AsynchronousFileChannel 中的异步线程设法从这个DirectByteBuffer 向磁盘写入数据。

结果文件被破坏是因为这个异步线程使用这个缓冲区与netty线程一起写入。

如果我们用EagerResponseBodyPart查看源代码,那么我们将看到以下内容

private final byte[] bytes;
  public EagerResponseBodyPart(ByteBuf buf, boolean last) 
    super(last);
    bytes = byteBuf2Bytes(buf);
  

  @Override
  public ByteBuffer getBodyByteBuffer() 
    return ByteBuffer.wrap(bytes);
  

因此,当一条数据到达时,它立即存储在字节数组中。然后我们可以安全地将它们包装在 HeapByteBuffer 中,并在文件通道中传输到异步线程。

但是如果你看代码LazyResponseBodyPart

  private final ByteBuf buf;

  public LazyResponseBodyPart(ByteBuf buf, boolean last) 
    super(last);
    this.buf = buf;
  
  @Override
  public ByteBuffer getBodyByteBuffer() 
    return buf.nioBuffer();
  

如你所见,我们实际上通过方法调用nioBuffer在异步文件通道线程nettyByteBuff(在这种情况下总是PooledSlicedByteBuf)使用@

在这种情况下我该怎么办,如何在异步线程中安全地传递 DirectByteBuffer 而无需将缓冲区复制到 Java 堆?

【问题讨论】:

为什么不使用BodyDeferringAsyncHandler 让生活更简单? @MạnhQuyếtNguyễn 因为它无效?我使用这个客户端来减少内存消耗和 CPU 资源。对于简单的生活,我可以使用 apache 同步客户端。顺便说一句,BodyDeferringAsyncHandler 在内存消耗方面与我使用EAGER 的示例没有什么不同,因为BodyDeferringAsyncHandler 使用getBodyPartBytes 方法。我不确定,但可能在使用BodyDeferringAsyncHandler 时,线程会在写入OutputStream 时阻塞。 仅供参考:调用client.prepareGet(downloadUrl).execute 的线程未被阻塞。保持简单 @MạnhQuyếtNguyễn 当然可以,但是处理数据的线程会被阻塞。 总有一个线程被阻塞:真正写数据的那个 【参考方案1】:

我与AsyncHttpClient 的维护者进行了交谈。 Can see here

主要问题是我不使用 netty ByteBuf 方法 retainrelease。 最后,我想到了两个解决方案。

首先:将字节顺序写入到位置为CompletableFuture的文件中。

AsynchronousFileChannel定义包装类

@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable 
    private final AtomicReference<CompletableFuture<Long>> positionReference;
    private final AsynchronousFileChannel channel;

    public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) 
        this.channel = channel;
        try 
            this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
         catch (IOException e) 
            throw new UncheckedIOException(e);
        
    

    public CompletableFuture<Long> write(ByteBuf byteBuffer) 
        final ByteBuf byteBuf = byteBuffer.retain();
        return positionReference.updateAndGet(x -> x.thenCompose(position -> 
            final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
            channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() 
                @Override
                public void completed(Integer result, ByteBuf attachment) 
                    attachment.release();
                    writenBytes.complete(result);
                

                @Override
                public void failed(Throwable exc, ByteBuf attachment) 
                    attachment.release();
                    log.error(exc);
                    writenBytes.completeExceptionally(exc);
                
            );
            return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
        ));
    

    public void close() 
        positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> 
            try 
                channel.close();
             catch (IOException e) 
                throw new UncheckedIOException(e);
            
        ));
    

事实上,这里可能不会有AtomicReference,如果录制发生在一个线程中,如果是多个线程,那么我们需要认真对待同步。

及主要用途。

public static void main(String[] args) throws IOException 
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
    final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() 
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) 
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
                    asyncChannelNettyByteBufWriter.write(byteBuf);
                    return State.CONTINUE;
                

                @Override
                public Response onCompleted(Response response) 
                    asyncChannelNettyByteBufWriter.close();
                    return response;
                
            );

第二种解决方案:根据接收到的字节大小跟踪位置。

public static void main(String[] args) throws IOException 
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() 

                private long position = 0;
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) 
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
                    long currentPosition = position;
                    position+=byteBuf.readableBytes();
                    channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() 
                        @Override
                        public void completed(Integer result, ByteBuf attachment) 
                            attachment.release();
                            if(content.isLast())
                                try 
                                    channel.close();
                                 catch (IOException e) 
                                    throw new UncheckedIOException(e);
                                
                            
                        

                        @Override
                        public void failed(Throwable exc, ByteBuf attachment) 
                            attachment.release();
                            try 
                                channel.close();
                             catch (IOException e) 
                                throw new UncheckedIOException(e);
                            
                        
                    );
                    return State.CONTINUE;
                
                @Override
                public Response onCompleted(Response response) 
                    return response;
                
            );

在第二种解决方案中,因为我们不等到一些字节写入文件,AsynchronousFileChannel 可以创建很多线程(如果你使用 Linux,因为 Linux 没有实现非阻塞异步文件 IO。在 Windows 中,情况要好得多)。

正如我的测量结果所示,在写入慢速 USB 闪存的情况下,线程数可能达到数万,因此您需要通过创建 ExecutorService 并将其传输到AsynchronousFileChannel.

第一种方案和第二种方案有明显的优缺点吗?我很难说。也许有人能说出什么更有效。

【讨论】:

您建议的第一个解决方案不会也遇到相同的线程问题吗?因为它也在使用AsynchronousFileChannel @shays10 会,但明显更少。因为在第一种解决方案中,字节是按顺序写入文件的,直到写入前一个字节才会写入新的部分字节。在第二种情况下,我们不等到前一部分字节写入文件,所以在慢速记录的情况下,我们创建了很多线程。您可以通过在慢速存储(例如便宜的闪存驱动器)上模拟录制来检查这一点,并检查创建了多少线程。

以上是关于Java AsyncHttpClient:从 LazyResponseBodyPart 写入 AsynchronousFileChannel 时文件损坏的主要内容,如果未能解决你的问题,请参考以下文章

android AsyncHttpClient 的作用好处?

如何在AsyncHttpClient中停止重定向并获取响应

安卓的异步下载(Asynchttpclient以及Volley)

AsyncHttpClient

HttpClient学习—— AsyncHttpClient使用

如何与Netty密切asynchttpclient异步HTTP请求