Spring WebClient:如何将大字节 [] 流式传输到文件?

Posted

技术标签:

【中文标题】Spring WebClient:如何将大字节 [] 流式传输到文件?【英文标题】:Spring WebClient: How to stream large byte[] to file? 【发布时间】:2019-10-06 04:32:36 【问题描述】:

似乎 Spring RestTemplate 无法将响应直接流式传输到文件而不将其全部缓冲在内存中。使用较新的 Spring 5 WebClient 实现此目的的正确方法是什么?

WebClient client = WebClient.create("https://example.com");
client.get().uri(".../name", name).accept(MediaType.APPLICATION_OCTET_STREAM)
                    ....?

我看到人们使用RestTemplate 找到了解决此问题的一些解决方法/技巧,但我更感兴趣的是使用WebClient 以正确的方式解决问题。

使用RestTemplate 下载二进制数据的例子很多,但几乎都将byte[] 加载到内存中。

【问题讨论】:

谢谢,但这并没有说明如何使用 WebClient。 我不认为它回答了这个问题。如果您认为可以,请创建一个答案。 Spring WebFlux Webclient receiving an application/octet-stream file as a Mono的可能重复 @K.Nicholas - 你真的认为这是那个问题的重复吗?一方面,该问题没有提到直接流式传输到文件(没有将整个响应保存在内存中),这是我问题的重点;而且这个问题是使用 Kotlin,而不是 Java。 > 是的,你是对的,应该将其标记为离题。 @K.Nicholas 我不知道你为什么一直试图找到一种方法来破坏我的问题,但请随时查看***.com/help/on-topic 和行为准则。 【参考方案1】:

使用最近稳定的 Spring WebFlux(撰写时为 5.2.4.RELEASE):

final WebClient client = WebClient.create("https://example.com");
final Flux<DataBuffer> dataBufferFlux = client.get()
        .accept(MediaType.TEXT_html)
        .retrieve()
        .bodyToFlux(DataBuffer.class); // the magic happens here

final Path path = FileSystems.getDefault().getPath("target/example.html");
DataBufferUtils
        .write(dataBufferFlux, path, CREATE_NEW)
        .block(); // only block here if the rest of your code is synchronous

对我来说,不明显的部分是 bodyToFlux(DataBuffer.class),因为它目前在 Spring 文档的 generic section about streaming 中提到,在 WebClient 部分中没有直接引用它。

【讨论】:

DataBuffer 基本上看起来就像一个 ByteBuffer。我没有看到读写器之间发生任何协调,也没有任何方法对缓冲区设置大小限制。你怎么知道 DataBuffer 是响应式的(或多线程协调的)和大小有界的? 没关系,显然流会根据需要生成尽可能多的 DataBuffers,每个 DataBuffers 都包含一个响应数据块。不确定大小是如何确定的,可能在 Netty 配置中。每个 DataBuffer 在发送到 Flux 时都是完整的,因此不需要协调。【参考方案2】:

将正文存储到临时文件并使用

static <R> Mono<R> writeBodyToTempFileAndApply(
        final WebClient.ResponseSpec spec,
        final Function<? super Path, ? extends R> function) 
    return using(
            () -> createTempFile(null, null),
            t -> write(spec.bodyToFlux(DataBuffer.class), t)
                    .thenReturn(function.apply(t)),
            t -> 
                try 
                    deleteIfExists(t);
                 catch (final IOException ioe) 
                    throw new RuntimeException(ioe);
                
            
    );

管体和消耗

static <R> Mono<R> pipeBodyAndApply(
        final WebClient.ResponseSpec spec, final ExecutorService executor,
        final Function<? super ReadableByteChannel, ? extends R> function) 
    return using(
            Pipe::open,
            p -> 
                final Future<Disposable> future = executor.submit(
                        () -> write(spec.bodyToFlux(DataBuffer.class), p.sink())
                                .log()
                                .doFinally(s -> 
                                    try 
                                        p.sink().close();
                                        log.debug("p.sink closed");
                                     catch (final IOException ioe) 
                                        throw new RuntimeException(ioe);
                                    
                                )
                                .subscribe(DataBufferUtils.releaseConsumer())
                );
                return just(function.apply(p.source()))
                        .log()
                        .doFinally(s -> 
                            try 
                                final Disposable disposable = future.get();
                                assert disposable.isDisposed();
                             catch (InterruptedException | ExecutionException e) 
                                e.printStackTrace();
                            
                        );
            ,
            p -> 
                try 
                    p.source().close();
                    log.debug("p.source closed");
                 catch (final IOException ioe) 
                    throw new RuntimeException(ioe);
                
            
    );

【讨论】:

这个答案不是***.com/a/56096484/839733的重复吗? @AbhijitSarkar 是的。【参考方案3】:

我不确定您在当前使用 spring 时是否可以访问RestTemplate,但这个对我有用。


RestTemplate restTemplate // = ...;

RequestCallback requestCallback = request -> request.getHeaders()
        .setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));

// Streams the response
ResponseExtractor<Void> responseExtractor = response -> 
    // Here I write the response to a file but do what you like
    Path path = Paths.get("http://some/path");
    Files.copy(response.getBody(), path);
    return null;
;
restTemplate.execute(URI.create("www.something.com"), HttpMethod.GET, requestCallback, responseExtractor);

【讨论】:

感谢您的回复。我看到代码与此处的代码相同:***.com/a/38664475/449652。但是,这不再适用于 Spring 5 或更高版本 - 请参阅此问题:github.com/spring-projects/spring-framework/issues/19448 原始发帖人说他想以正确的方式来做,Webclient 是非阻塞的。 RestTemplate 即将被弃用。【参考方案4】:

我无法测试以下代码是否有效地将webClient 有效负载的内容缓存在内存中。不过,我认为你应该从那里开始:

public Mono<Void> testWebClientStreaming() throws IOException 
    Flux<DataBuffer> stream = 
            webClient
                    .get().accept(MediaType.APPLICATION_OCTET_STREAM)
                    .retrieve()
            .bodyToFlux(DataBuffer.class);
    Path filePath = Paths.get("filename");
    AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(filePath, WRITE);
    return DataBufferUtils.write(stream, asynchronousFileChannel)
            .doOnNext(DataBufferUtils.releaseConsumer())
            .doAfterTerminate(() -> 
                try 
                    asynchronousFileChannel.close();
                 catch (IOException ignored)  
            ).then();

【讨论】:

仅供参考,这里有一个新孩子。见DataBufferUtils#write(Publisher<DataBuffer>, Path, OpenOption...)

以上是关于Spring WebClient:如何将大字节 [] 流式传输到文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Spring WebClient 获取响应 json

如何使用Spring WebClient同时进行多个调用?

使用 Spring Boot WebClient 时如何拦截请求

如何在 Spring WebClient 中拦截 http 流量?

如何在 Spring 响应式 WebClient 中返回 Kotlin Coroutines Flow

Spring 5 webflux如何在Webclient上设置超时