如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream
Posted
技术标签:
【中文标题】如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream【英文标题】:How to correctly read Flux<DataBuffer> and convert it to a single inputStream 【发布时间】:2018-03-09 16:37:43 【问题描述】:我正在为我的 spring-boot 应用程序使用 WebClient
和自定义 BodyExtractor
class
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response ->
return response.body(new BodyExtractor());
)
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context)
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer ->
try
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
catch(Exception e)
return null;
).next();
上面的代码适用于小负载,但不适用于大负载,我认为这是因为我只使用next
读取单个通量值,我不确定如何组合和读取所有dataBuffer
。
我是 reactor 的新手,所以我不知道很多有关通量/单声道的技巧。
【问题讨论】:
【参考方案1】:我能够通过使用 Flux#collect
和 SequenceInputStream
使其工作
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context)
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream ->
try
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
catch(Exception e)
return null;
).next();
InputStreamCollector.java
public class InputStreamCollector
private InputStream is;
public void collectInputStream(InputStream is)
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
public InputStream getInputStream()
return this.is;
【讨论】:
您为什么要编写自己的 BodyExtractor? WebFlux 已经通过 Jaxb2XmlDecoder 支持 Jaxb。 @BrianClozel 我需要配置一些东西才能让它工作吗?bodyToMono
似乎没有接受我的 pojo。
InputStreamCollector
是什么?
很有趣,但WebClient
是这个工作的错误工具。您正在重建响应 InputStream
,因此您没有使用 WebClient
的优势。最好使用普通的 HTTP 客户端。
这个解决方案不是将所有响应体读入内存吗? ByteBuffer
将所有数据存储在内存中,对吗?因此生成的InputStream
将与ByteArrayInputStream
相同,因此此解决方案不处理大数据。【参考方案2】:
Bk Santiago 答案的略微修改版本使用reduce()
而不是collect()
。非常相似,但不需要额外的类:
Java:
body.reduce(new InputStream()
public int read() return -1;
, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
或 Kotlin:
body.reduce(object : InputStream()
override fun read() = -1
) s: InputStream, d -> SequenceInputStream(s, d.asInputStream())
.flatMap inputStream -> /* do something with single InputStream */
与使用 collect()
相比,这种方法的好处是您无需使用不同的类来收集信息。
我创建了一个新的空InputStream()
,但如果该语法令人困惑,您也可以将其替换为ByteArrayInputStream("".toByteArray())
,以创建一个空的ByteArrayInputStream
作为您的初始值。
【讨论】:
你可以用InputStream.nullInputStream()
代替new InputStream() public int read() return -1;
【参考方案3】:
你可以使用管道。
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function)
return using(Pipe::open,
p ->
executor.execute(() -> write(source, p.sink())
.doFinally(s ->
try
p.sink().close();
catch (final IOException ioe)
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
)
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
,
p ->
try
p.source().close();
catch (final IOException ioe)
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
);
或者使用CompletableFuture
,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function)
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s ->
try
p.sink().close();
catch (final IOException ioe)
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
)
.subscribe(releaseConsumer())),
p ->
try
p.source().close();
catch (final IOException ioe)
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
);
【讨论】:
【参考方案4】:这确实不像其他答案所暗示的那么复杂。
正如@jin-kwon 所建议的那样,流式传输数据而不将其全部缓冲在内存中的唯一方法是使用管道。但是,使用 Spring 的 BodyExtractors 和 DataBufferUtils 实用程序类可以非常简单地完成。
例子:
private InputStream readAsInputStream(String url) throws IOException
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t ->
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe)
//no-op
catch (IOException ioe)
log.error("Error closing streams", ioe);
)
.doFinally(s ->
try(osPipe)
//no-op
catch (IOException ioe)
log.error("Error closing streams", ioe);
);
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
如果您不关心检查响应代码或针对失败状态代码抛出异常,您可以通过使用跳过block()
调用和中间ClientResponse
变量
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
改为。
【讨论】:
看起来很有前途且简单,这可能是处理大型请求的正确答案。如果我有时间,我会试试这个。 我应该补充一点,我同意@abhijit-sarkar 早先的comment,即WebClient
不是这项工作的最佳工具。虽然它可以做到(正如我已经证明的那样),但这并不是最有效的方法。如果您只需要一个InputStream
,您最好使用同步客户端,例如java.net.http.HttpClient
。如果您坚持使用WebClient
,那么我相信我的解决方案是最佳选择。
如果没有错误,isPipe
似乎永远不会关闭
将PipedInputSteam
更改为PipedInputStream
和MediaType.APPLICATION.XML
更改为MediaType.APPLICATION_XML
。我摆脱了状态码,所以我需要使用flatMapMany(r -> r.body(BodyExtractors.toDataBuffers()))
而不是flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
使用 reactor-core 3.3.9.RELEASE 的 Java 8 无效。 PipedInputStream 和 PipedOutputStream 只包含没有终止的 0。它将我的解组器挂在调用 unmarshaller.unmarshal(isPipe) 中。事实上,在我的调试器中,doFinally 永远不会被调用,这是可疑的【参考方案5】:
这是其他答案的另一个变体。而且它仍然对内存不友好。
static Mono<InputStream> asStream(WebClient.ResponseSpec response)
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
static void doSome(WebClient.ResponseSpec response)
asStream(response)
.doOnNext(stream ->
// do some with stream
// close the stream!!!
)
.block();
【讨论】:
处理小文件时超级简单。 @Tires 我真的怀疑DataBuffer::asInputStream
。见asInputStream()
@JinKwon 你是对的。我想知道为什么我之前没有看到关于未释放缓冲区的 Netty 警告
小心。如果你关闭 SequenceInputStream(否则你会从 Netty 得到未释放的缓冲区错误),那么如果你有一个大文件或很多小缓冲区,它很容易导致 ***Error。【参考方案6】:
有一种更简洁的方法可以直接使用底层 reactor-netty HttpClient
,而不是使用 WebClient
。组合层次是这样的:
WebClient -uses-> HttpClient -uses-> TcpClient
显示代码比解释更容易:
HttpClient.create()
.get()
.responseContent() // ByteBufFlux
.aggregate() // ByteBufMono
.asInputStream() // Mono<InputStream>
.block() // We got an InputStream, yay!
但是,正如我已经指出的那样,使用 InputStream
是一种阻塞操作,它违背了使用非阻塞 HTTP 客户端的目的,更不用说聚合整个响应了。有关 Java NIO 与 IO 的比较,请参阅 this。
【讨论】:
以上是关于如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream的主要内容,如果未能解决你的问题,请参考以下文章
spring webflux Flux<DataBuffer> 转换为 InputStream