使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件
Posted
技术标签:
【中文标题】使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件【英文标题】:Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor 【发布时间】:2018-10-09 23:47:40 【问题描述】:在 Project Reactor 中下载完成后,我无法正确保存文件。
class HttpImageClientDownloader implements ImageClientDownloader
private final ExchangeFunction exchangeFunction;
HttpImageClientDownloader()
this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
@Override
public Mono<File> downloadImage(String url, Path destination)
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
return exchangeFunction.exchange(clientRequest)
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
//.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
.flatMap(dataBuffer ->
AsynchronousFileChannel fileChannel = createFile(destination);
return DataBufferUtils
.write(dataBuffer, fileChannel, 0)
.publishOn(Schedulers.elastic())
.doOnNext(DataBufferUtils::release)
.then(Mono.just(destination.toFile()));
);
private AsynchronousFileChannel createFile(Path path)
try
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
catch (Exception e)
throw new ImageDownloadException("Error while creating file: " + path, e);
所以我的问题是: DataBufferUtils.write(dataBuffer, fileChannel, 0) 是否阻塞?
磁盘慢的时候怎么办?
关于 ImageDownloadException 发生时会发生什么的第二个问题, 在 doOnNext 中我想释放给定的数据缓冲区,是这种操作的好地方吗?
我也觉得这行:
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
可能会阻塞...
【问题讨论】:
【参考方案1】:这是实现这一目标的另一种(更短的)方法:
Flux<DataBuffer> data = this.webClient.get()
.uri("/greeting")
.retrieve()
.bodyToFlux(DataBuffer.class);
Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
.map(DataBufferUtils::release)
.then(Mono.just(file));
现在DataBufferUtils::write
操作不会阻塞,因为它们使用带有通道的非阻塞 IO。写入此类通道意味着它将尽可能写入输出缓冲区(即可以写入所有 DataBuffer
或只是其中的一部分)。
使用Flux::map
或Flux::doOnNext
是执行此操作的正确位置。但你是对的,如果发生错误,你仍然有责任释放当前缓冲区(以及所有剩余的缓冲区)。 Spring Framework 中可能有一些我们可以改进的地方,请关注SPR-16782。
我看不出您的上一个示例如何显示任何阻塞:所有方法都返回反应类型,并且没有一个正在阻塞 I/O。
【讨论】:
嗨@Brian Clozel。我尝试了上面的示例,但在下载内容后我无法删除文件。该文件被 Java VM 阻止。你有什么建议给我吗?以上是关于使用 Project Reactor 中的 ExchangeFunction 从客户端请求中下载并保存文件的主要内容,如果未能解决你的问题,请参考以下文章
关于 Project Reactor 的 flatMap 中的线程的困惑