如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream

Posted

技术标签:

【中文标题】如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream【英文标题】:How to correctly read Flux<DataBuffer> and convert it to a single inputStream 【发布时间】:2018-03-09 16:37:43 【问题描述】:

我正在为我的 spring-boot 应用程序使用 WebClient 和自定义 BodyExtractorclass

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> 
     return response.body(new BodyExtractor());
   )

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) 
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> 
    try 
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
     catch(Exception e)
       return null;
    
  ).next();

上面的代码适用于小负载,但不适用于大负载,我认为这是因为我只使用next 读取单个通量值,我不确定如何组合和读取所有dataBuffer

我是 reactor 的新手,所以我不知道很多有关通量/单声道的技巧。

【问题讨论】:

【参考方案1】:

我能够通过使用 Flux#collectSequenceInputStream 使其工作

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) 
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> 
      try 
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
       catch(Exception e)
        return null;
      
  ).next();

InputStreamCollector.java

public class InputStreamCollector 
  private InputStream is;

  public void collectInputStream(InputStream is) 
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  

  public InputStream getInputStream() 
    return this.is;
  

【讨论】:

您为什么要编写自己的 BodyExtractor? WebFlux 已经通过 Jaxb2XmlDecoder 支持 Jaxb。 @BrianClozel 我需要配置一些东西才能让它工作吗? bodyToMono 似乎没有接受我的 pojo。 InputStreamCollector 是什么? 很有趣,但WebClient 是这个工作的错误工具。您正在重建响应 InputStream,因此您没有使用 WebClient 的优势。最好使用普通的 HTTP 客户端。 这个解决方案不是将所有响应体读入内存吗? ByteBuffer 将所有数据存储在内存中,对吗?因此生成的InputStream 将与ByteArrayInputStream 相同,因此此解决方案不处理大数据。【参考方案2】:

Bk Santiago 答案的略微修改版本使用reduce() 而不是collect()。非常相似,但不需要额外的类:

Java:

body.reduce(new InputStream() 
    public int read()  return -1; 
  , (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

或 Kotlin:

body.reduce(object : InputStream() 
  override fun read() = -1
)  s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) 
  .flatMap  inputStream -> /* do something with single InputStream */ 

与使用 collect() 相比,这种方法的好处是您无需使用不同的类来收集信息。

我创建了一个新的空InputStream(),但如果该语法令人困惑,您也可以将其替换为ByteArrayInputStream("".toByteArray()),以创建一个空的ByteArrayInputStream 作为您的初始值。

【讨论】:

你可以用InputStream.nullInputStream()代替new InputStream() public int read() return -1; 【参考方案3】:

你可以使用管道。

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source, final Executor executor,
        final Function<? super ReadableByteChannel, ? extends R> function) 
    return using(Pipe::open,
                 p -> 
                     executor.execute(() -> write(source, p.sink())
                             .doFinally(s -> 
                                 try 
                                     p.sink().close();
                                  catch (final IOException ioe) 
                                     log.error("failed to close pipe.sink", ioe);
                                     throw new RuntimeException(ioe);
                                 
                             )
                             .subscribe(releaseConsumer()));
                     return just(function.apply(p.source()));
                 ,
                 p -> 
                     try 
                         p.source().close();
                      catch (final IOException ioe) 
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     
                 );

或者使用CompletableFuture

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source,
        final Function<? super ReadableByteChannel, ? extends R> function) 
    return using(Pipe::open,
                 p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
                         .doFirst(() -> write(source, p.sink())
                                 .doFinally(s -> 
                                     try 
                                         p.sink().close();
                                      catch (final IOException ioe) 
                                         log.error("failed to close pipe.sink", ioe);
                                         throw new RuntimeException(ioe);
                                     
                                 )
                                 .subscribe(releaseConsumer())),
                 p -> 
                     try 
                         p.source().close();
                      catch (final IOException ioe) 
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     
                 );

【讨论】:

【参考方案4】:

这确实不像其他答案所暗示的那么复杂。

正如@jin-kwon 所建议的那样,流式传输数据而不将其全部缓冲在内存中的唯一方法是使用管道。但是,使用 Spring 的 BodyExtractors 和 DataBufferUtils 实用程序类可以非常简单地完成。

例子:

private InputStream readAsInputStream(String url) throws IOException 
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> 
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) 
              //no-op
             catch (IOException ioe) 
                log.error("Error closing streams", ioe);
            
        )
        .doFinally(s -> 
            try(osPipe) 
              //no-op
             catch (IOException ioe) 
                log.error("Error closing streams", ioe);
            
        );

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;

如果您不关心检查响应代码或针对失败状态代码抛出异常,您可以通过使用跳过block() 调用和中间ClientResponse 变量

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

改为。

【讨论】:

看起来很有前途且简单,这可能是处理大型请求的正确答案。如果我有时间,我会试试这个。 我应该补充一点,我同意@abhijit-sarkar 早先的comment,即WebClient 不是这项工作的最佳工具。虽然它可以做到(正如我已经证明的那样),但这并不是最有效的方法。如果您只需要一个InputStream,您最好使用同步客户端,例如java.net.http.HttpClient。如果您坚持使用WebClient,那么我相信我的解决方案是最佳选择。 如果没有错误,isPipe 似乎永远不会关闭 PipedInputSteam 更改为PipedInputStreamMediaType.APPLICATION.XML 更改为MediaType.APPLICATION_XML。我摆脱了状态码,所以我需要使用flatMapMany(r -&gt; r.body(BodyExtractors.toDataBuffers())) 而不是flatMap(r -&gt; r.body(BodyExtractors.toDataBuffers())) 使用 reactor-core 3.3.9.RELEASE 的 Java 8 无效。 PipedInputStream 和 PipedOutputStream 只包含没有终止的 0。它将我的解组器挂在调用 unmarshaller.unmarshal(isPipe) 中。事实上,在我的调试器中,doFinally 永远不会被调用,这是可疑的【参考方案5】:

这是其他答案的另一个变体。而且它仍然对内存不友好。

static Mono<InputStream> asStream(WebClient.ResponseSpec response) 
    return response.bodyToFlux(DataBuffer.class)
        .map(b -> b.asInputStream(true))
        .reduce(SequenceInputStream::new);


static void doSome(WebClient.ResponseSpec response) 
    asStream(response)
        .doOnNext(stream -> 
            // do some with stream
            // close the stream!!!
        )
        .block();

【讨论】:

处理小文件时超级简单。 @Tires 我真的怀疑DataBuffer::asInputStream。见asInputStream() @JinKwon 你是对的。我想知道为什么我之前没有看到关于未释放缓冲区的 Netty 警告 小心。如果你关闭 SequenceInputStream(否则你会从 Netty 得到未释放的缓冲区错误),那么如果你有一个大文件或很多小缓冲区,它很容易导致 ***Error。【参考方案6】:

有一种更简洁的方法可以直接使用底层 reactor-netty HttpClient,而不是使用 WebClient。组合层次是这样的:

WebClient -uses-> HttpClient -uses-> TcpClient

显示代码比解释更容易:

HttpClient.create()
    .get()
    .responseContent() // ByteBufFlux
    .aggregate() // ByteBufMono
    .asInputStream() // Mono<InputStream>
    .block() // We got an InputStream, yay!

但是,正如我已经指出的那样,使用 InputStream 是一种阻塞操作,它违背了使用非阻塞 HTTP 客户端的目的,更不用说聚合整个响应了。有关 Java NIO 与 IO 的比较,请参阅 this。

【讨论】:

以上是关于如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream的主要内容,如果未能解决你的问题,请参考以下文章

spring webflux Flux<DataBuffer> 转换为 InputStream

在 webflux 中读取请求正文

如何在文件行出现并将它们表示为 Flux 时读取它们?

从块中读取 Flux<Integer>

如何在 WebFilter 上配置 Spring DataBuffer 大小

React.js + Flux - 正确初始化存储中的数据对象