spring webflux Flux<DataBuffer> 转换为 InputStream

Posted

技术标签:

【中文标题】spring webflux Flux<DataBuffer> 转换为 InputStream【英文标题】:spring webflux Flux<DataBuffer> convert to InputStream 【发布时间】:2019-01-11 14:25:10 【问题描述】:

我目前正在研究 Spring WebFlux。 我正在尝试使用 Spring WebFlux 上传大文件 (70mo)。

我的控制器

@RequestMapping(method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<String> uploadHandler(@RequestBody Flux<Part> fluxParts, @RequestParam(value = "categoryType") String categoryType, @PathVariable(value = "traceabilityReportUuid") String traceabilityUuid) 
    return documentHandler.upload(fluxParts, UUID.fromString(traceabilityUuid), categoryType);

我的服务

public Flux<String> upload(Flux<Part> fluxParts, UUID traceabilityUuid, String categoryType) 

    return fluxParts
            .filter(part -> part instanceof FilePart)
            .ofType(FilePart.class)
            .flatMap(p -> this.upload(p, traceabilityUuid, categoryType));





private Mono<String> upload(FilePart filePart, UUID traceabilityUuid, String categoryType) 

    return filePart.content().collect(InputStreamCollector::new, (t, dataBuffer) -> t.collectInputStream(dataBuffer.asInputStream()))
            .flatMap(inputStreamCollector -> 
                upload(traceabilityUuid, inputStreamCollector.getInputStream(), filePart.filename(), categoryType);

                return Mono.just("OK");
            );

我的收藏家

public class InputStreamCollector 

    private InputStream is;

    public void collectInputStream(InputStream is) 
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
    

    public InputStream getInputStream() 
        return this.is;
    

最后,我通过这种方式检索完整的输入流:inputStreamCollector.getInputStream() 并传递给我的对象。

我使用这个对象来发送到存储桶 S3。

但在发送到 S3 之前,我必须将其转换为文件(使用 apache 工具),我有一个 *** 异常。

java.lang.***Error: null
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)

它适用于小文件(7mo ..)

您有什么想法可以解决我的问题吗?

【问题讨论】:

【参考方案1】:

我终于找到了解决办法!

https://github.com/entzik/reactive-spring-boot-examples/blob/master/src/main/java/com/thekirschners/springbootsamples/reactiveupload/ReactiveUploadResource.java

我修改了代码以返回 InputStream,它适用于大文件;-)

【讨论】:

最后,这不是好的解决方案 :) 最好的是blog.davidvassallo.me/2018/07/09/… 您最终使用的是多部分/文件,而不是缓冲区? 注意 Spring 从请求正文中同步解析 FilePart。将大文件作为 1 个多部分/文件上传时,首先将数据传输到临时文件: System.getProperty("java.io.tmpdir") + "/nio-file-upload/nio-body-- .tmp”。当上传完成 Flux 开始发出事件。我认为 Flux 在上传多个小文件时很有用【参考方案2】:

要将 DataBuffer 转换为 StringList,您可以使用 Apache IOUtils .在这个示例中,我返回了一个 Flux,为了避免 try/catch 我用 Mono.fromCallable 包裹。

protected Flux<String> getLines(final DataBuffer dataBuffer) 
    return Mono.fromCallable(() -> IOUtils.readLines(dataBuffer.asInputStream(), Charsets.UTF_8))
        .flatMapMany(Flux::fromIterable);

【讨论】:

在非阻塞框架中使用阻塞 API 是不可取的。【参考方案3】:

本示例将帮助您了解如何从 FilePart 加载数据:

public static Mono<String> readBase64Content(FilePart filePart) 
    return filePart.content().flatMap(dataBuffer -> 
         byte[] bytes = new byte[dataBuffer.readableByteCount()];
         dataBuffer.read(bytes);
         String content = Base64.getEncoder().encodeToString(bytes);
         return Mono.just(content);
    ).last();

休息法

@PostMapping(value = "/person/personId/photo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Mono<String> uploadPhoto(@PathVariable Long personId, @RequestPart("photo") Mono<FilePart> photo) 
        return photo.ofType(FilePart.class).flatMap(StringUtil::readBase64Content);

【讨论】:

以上是关于spring webflux Flux<DataBuffer> 转换为 InputStream的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

Spring WebFlux 休息控制器仅服务于前两个订阅

从 Spring WebFlux 返回 Flux<String> 返回一个字符串而不是 JSON 中的字符串数组

Spring WebFlux(Flux):如何动态发布

Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux

Log Spring webflux 类型 - Mono 和 Flux