将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?
Posted
技术标签:
【中文标题】将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?【英文标题】:Is it correct to convert a CompletableFuture<Stream<T>> to a Publisher<T>?将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是否正确? 【发布时间】:2018-08-18 14:12:24 【问题描述】:为了允许对来自 CompletableFuture<Stream<String>>
的结果流进行多次迭代,我正在考虑以下方法之一:
通过teams.thenApply(st -> st.collect(toList()))
将生成的future转换为CompletableFuture<List<String>>
将生成的 future 转换为 Flux<String>
并使用缓存:Flux.fromStream(teams::join).cache();
Flux<T>
是Publisher<T>
在项目reactor中的实现。
用例:
我想从提供League
对象和Standing[]
的数据源中获取包含英超球队名称(例如Stream<String>
)的序列(基于足球数据RESTful API,例如@987654321 @)。使用AsyncHttpClient
和Gson
我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
要重用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T>
不那么冗长,并提供了我需要的一切。然而,在这种情况下使用它是否正确?
或者我应该改用CompletableFuture<List<String>>
吗?或者还有其他更好的选择吗?
更新了一些想法(2018-03-16):
CompletableFuture<List<String>>
:
List<String>
将继续收集,当我们需要继续处理未来的结果时,它可能已经完成。
[CONS] 声明冗长。
[缺点] 如果我们只想使用一次,那么我们不需要在List<T>
中收集这些项目。
Flux<String>
:
.cache()
并将其转发到下一层,这可以利用响应式 API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() …
[缺点] 如果我们想重用 Flux<T>
,我们必须将其包装在可缓存的 Flux<T>
(….cache()
) 中,这反过来会增加第一次遍历的开销,因为它必须存储结果项目在内部缓存中。
【问题讨论】:
“这反过来会增加第一次遍历的开销” - 可以忽略不计,忽略这个。Flux
是一个异步响应式管道。 List
是 List
。您需要什么?您正在将苹果与橙子进行比较。
@BoristheSpider 我没有将List
与Flux
进行比较。我将CF<List>
与Flux
进行比较。
那是Mono<List<T>>
不是Flux<T>
。应该很明显两者是不同的。
Mono<List<T>>
与 CF<List<T>>
相同。从CF<List<T>>
转换为Mono<List<T>>
没有优势。
【参考方案1】:
CompletableFuture<Stream<String>> teams = ...;
Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Flux.fromStream(teams::join)
是一种代码异味,因为它阻塞了一个线程以从另一个线程上运行的 CompletableFuture
获取结果。
【讨论】:
我认为它不包含调用fromStream(...)
的线程。那是懒惰。我们只是将一个方法句柄传递给join
,而不是调用它。订阅后,推送线程可能会保留在 join()
上,但前提是源 CompletableFuture
尚未完成。【参考方案2】:
一旦您下载了联赛表,并且从该表中提取了球队名称,我不确定您是否需要一个准备好背压的流来迭代这些项目。将流转换为标准列表(或数组)应该已经足够好了,并且应该有更好的性能,不是吗?
例如:
String[] teamNames = teams.join().toArray(String[]::new);
【讨论】:
我不确定管理费用。向List
收集物品也会产生额外的开销,不是吗?如果我们不总是重用生成的teams
,那么从Supplier::stream
创建一个冷流 可能会更有效,因为我们没有创建一个辅助List
对象。另一方面,如果我们确定要重用teams
,那么将它收集在List
中可能比使用cache()
更好,因为后者会在第一次遍历时增加开销。这些是我的疑问!!
确实,如果您的意图是读取一次流,您可以保留一个流;但如果您必须多次解析项目集合,我更愿意将项目流转换为项目数组,而不是使用背压就绪的冷流来迭代这些项目。
不确定将异步管道转换为同步管道是我见过的最佳建议。以上是关于将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?的主要内容,如果未能解决你的问题,请参考以下文章
将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是不是正确?
返回 CompletableFuture<Void> 还是 CompletableFuture<?>?