使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件

Posted

技术标签:

【中文标题】使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件【英文标题】:Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor 【发布时间】:2018-10-09 23:47:40 【问题描述】:

在 Project Reactor 中下载完成后,我无法正确保存文件。

class HttpImageClientDownloader implements ImageClientDownloader 

    private final ExchangeFunction exchangeFunction;

    HttpImageClientDownloader() 
        this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
    

    @Override
    public Mono<File> downloadImage(String url, Path destination) 

        ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
        return exchangeFunction.exchange(clientRequest)
                .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                //.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                .flatMap(dataBuffer -> 

                    AsynchronousFileChannel fileChannel = createFile(destination);
                    return DataBufferUtils
                            .write(dataBuffer, fileChannel, 0)
                            .publishOn(Schedulers.elastic())
                            .doOnNext(DataBufferUtils::release)
                            .then(Mono.just(destination.toFile()));


                );

    

    private AsynchronousFileChannel createFile(Path path) 
        try 
            return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
         catch (Exception e) 
            throw new ImageDownloadException("Error while creating file: " + path, e);
        
    

所以我的问题是: DataBufferUtils.write(dataBuffer, fileChannel, 0) 是否阻塞?

磁盘慢的时候怎么办?

关于 ImageDownloadException 发生时会发生什么的第二个问题, 在 doOnNext 中我想释放给定的数据缓冲区,是这种操作的好地方吗?

我也觉得这行:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))

可能会阻塞...

【问题讨论】:

【参考方案1】:

这是实现这一目标的另一种(更短的)方法:

Flux<DataBuffer> data = this.webClient.get()
        .uri("/greeting")
        .retrieve()
        .bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
        .map(DataBufferUtils::release)
        .then(Mono.just(file));

现在DataBufferUtils::write 操作不会阻塞,因为它们使用带有通道的非阻塞 IO。写入此类通道意味着它将尽可能写入输出缓冲区(即可以写入所有 DataBuffer 或只是其中的一部分)。

使用Flux::mapFlux::doOnNext 是执行此操作的正确位置。但你是对的,如果发生错误,你仍然有责任释放当前缓冲区(以及所有剩余的缓冲区)。 Spring Framework 中可能有一些我们可以改进的地方,请关注SPR-16782。

我看不出您的上一个示例如何显示任何阻塞:所有方法都返回反应类型,并且没有一个正在阻塞 I/O。

【讨论】:

嗨@Brian Clozel。我尝试了上面的示例,但在下载内容后我无法删除文件。该文件被 Java VM 阻止。你有什么建议给我吗?

以上是关于使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件的主要内容,如果未能解决你的问题,请参考以下文章

关于 Project Reactor 的 flatMap 中的线程的困惑

使用Project Reactor对反应流进行递归

线程1:EXC_BAD_ACCESS (code=1) error c++ project

#yyds干货盘点#Project Reactor

Project Reactor:仅在未发出第一项时通量超时

Project Reactor 之 publishOn 与 subscribeOn