将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?

Posted

技术标签:

【中文标题】将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?【英文标题】:Is it correct to convert a CompletableFuture<Stream<T>> to a Publisher<T>?将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是否正确? 【发布时间】:2018-08-18 14:12:24 【问题描述】:

为了允许对来自 CompletableFuture&lt;Stream&lt;String&gt;&gt; 的结果流进行多次迭代,我正在考虑以下方法之一:

    通过teams.thenApply(st -&gt; st.collect(toList()))将生成的future转换为CompletableFuture&lt;List&lt;String&gt;&gt;

    将生成的 future 转换为 Flux&lt;String&gt; 并使用缓存:Flux.fromStream(teams::join).cache();

Flux&lt;T&gt;Publisher&lt;T&gt;在项目reactor中的实现。

用例:

我想从提供League 对象和Standing[] 的数据源中获取包含英超球队名称(例如Stream&lt;String&gt;)的序列(基于足球数据RESTful API,例如@987654321 @)。使用AsyncHttpClientGson 我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));

要重用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()

Flux&lt;T&gt; 不那么冗长,并提供了我需要的一切。然而,在这种情况下使用它是否正确?

或者我应该改用CompletableFuture&lt;List&lt;String&gt;&gt; 吗?或者还有其他更好的选择吗?

更新了一些想法(2018-03-16)

CompletableFuture&lt;List&lt;String&gt;&gt;:

[PROS]List&lt;String&gt; 将继续收集,当我们需要继续处理未来的结果时,它可能已经完成。 [CONS] 声明冗长。 [缺点] 如果我们只想使用一次,那么我们不需要在List&lt;T&gt; 中收集这些项目。

Flux&lt;String&gt;:

[PROS] 声明简洁 [PROS] 如果我们只想使用一次,那么我们可以省略 .cache() 并将其转发到下一层,这可以利用响应式 API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux&lt;String&gt; getTeams() … [缺点] 如果我们想重用 Flux&lt;T&gt;,我们必须将其包装在可缓存的 Flux&lt;T&gt; (….cache()) 中,这反过来会增加第一次遍历的开销,因为它必须存储结果项目在内部缓存中。

【问题讨论】:

这反过来会增加第一次遍历的开销” - 可以忽略不计,忽略这个。 Flux 是一个异步响应式管道。 ListList。您需要什么?您正在将苹果与橙子进行比较。 @BoristheSpider 我没有将ListFlux 进行比较。我将CF&lt;List&gt;Flux 进行比较。 那是Mono&lt;List&lt;T&gt;&gt; 不是Flux&lt;T&gt;。应该很明显两者是不同的。 Mono&lt;List&lt;T&gt;&gt;CF&lt;List&lt;T&gt;&gt; 相同。从CF&lt;List&lt;T&gt;&gt; 转换为Mono&lt;List&lt;T&gt;&gt; 没有优势。 【参考方案1】:
    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));

Flux.fromStream(teams::join) 是一种代码异味,因为它阻塞了一个线程以从另一个线程上运行的 CompletableFuture 获取结果。

【讨论】:

我认为它不包含调用fromStream(...) 的线程。那是懒惰。我们只是将一个方法句柄传递给join,而不是调用它。订阅后,推送线程可能会保留在 join() 上,但前提是源 CompletableFuture 尚未完成。【参考方案2】:

一旦您下载了联赛表,并且从该表中提取了球队名称,我不确定您是否需要一个准备好背压的流来迭代这些项目。将流转换为标准列表(或数组)应该已经足够好了,并且应该有更好的性能,不是吗?

例如:

String[] teamNames = teams.join().toArray(String[]::new);

【讨论】:

我不确定管理费用。向List 收集物品也会产生额外的开销,不是吗?如果我们不总是重用生成的teams,那么从Supplier::stream 创建一个冷流 可能会更有效,因为我们没有创建一个辅助List 对象。另一方面,如果我们确定要重用teams,那么将它收集在List 中可能比使用cache() 更好,因为后者会在第一次遍历时增加开销。这些是我的疑问!! 确实,如果您的意图是读取一次流,您可以保留一个流;但如果您必须多次解析项目集合,我更愿意将项目流转换为项目数组,而不是使用背压就绪的冷流来迭代这些项目。 不确定将异步管道转换为同步管道是我见过的最佳建议。

以上是关于将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?的主要内容,如果未能解决你的问题,请参考以下文章

将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?

CompletableFuture进阶

返回 CompletableFuture<Void> 还是 CompletableFuture<?>?

thenApply()和thenCompose()的区别

如何将异步CompletableFuture与完成的CompletableFuture结合起来?

CompletableFuture: 分析一