Flux 不会在“then”之前等待元素完成
Posted
技术标签:
【中文标题】Flux 不会在“then”之前等待元素完成【英文标题】:Flux does not wait for completion of elements before 'then' 【发布时间】:2020-10-16 20:50:40 【问题描述】:我无法理解这个问题,我不确定我做错了什么
我想等待flux结束,然后返回serverResponse的mono
我已附上代码 sn-p,doOnNext 将填充 categoryIdToPrintRepository
我查看了如何在通量结束后返回单声道,发现“then”但仍然在处理 onNextSite 之前执行“then”方法,这导致错误:
java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry
我做错了什么?
public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored)
return Mono.just("start").flatMap(id ->
Flux.fromIterable(appSettings.getSites())
.subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
.doOnNext(this::onNextSite)
.then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));
private void onNextSite(Integer siteId)
IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId ->
Optional<SiteCatalogCategoryDTO> cacheData =
siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> /*do nothing already exist in cache*/,
() ->
Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
.get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
);
);
private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
int siteId, int catalogId)
if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
.subscribe(mapSCC ->
categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
"Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
);
【问题讨论】:
【参考方案1】:你做错了几件事,你不应该在你的应用程序中subscribe
,并且你有 void 方法,除非在特定的地方,否则不应该在反应式编程中使用。
这里是一些示例代码:
// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething()
Mono.just("Foo");
// complier error
doSomething().subscribe( ... );
您的应用程序是调用客户端的publisher
,是订阅者,这就是为什么我们向调用客户端返回 Mono 或 Flux,它们是 subscribe
。
你已经这样解决了:
private void doSomething()
Mono.just("Foo").subscribe( ... );
doSomething();
现在您正在订阅自己以使事情运行,这不是正确的方式,如前所述,调用客户端是订阅者,而不是您。
正确方法:
private Mono<String> doSomething()
return Mono.just("Foo");
// This is returned out to the calling client, they subscribe
return doSomething();
当 Mono/Flux 完成时,它会发出一个信号,这个信号将触发链中的下一个和下一个。
所以我对你需要做的事情的看法如下:
删除allsubscribes
,如果你想做一些事情,有flatmap
,map
,doOnSuccess
等函数。保持链一直保持不变客户。
删除所有 void 函数,确保它们返回 Flux
或 Mono
,如果你想不返回一些东西,使用返回 Mono<Void>
Mono.empty()
函数,这样链就完成了。
一旦您使用 Mono/Flux,您需要处理返回,以便其他人可以链接。
更新:
为了触发then
,你必须返回一些东西,它会在之前的单声道/通量完成时返回。
示例:
private Flux<String> doSomething()
return Flux.just("Foo", "Bar", "FooBar")
.doOnNext(string ->
return // return something
);
// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );
【讨论】:
我理解你的remakrs,但问题仍然存在,我不想返回通量,我想完成通量执行然后返回一个单声道作为对客户端的响应,如何这可以在 spring webflux 中完成吗? 阅读then(Mono<V> other)
的文档,它声明Let this Flux complete then play signals from a provided Mono
,这意味着如果您在通量完成时返回Flux
(无论您希望它做什么),它将返回您放入的内容你的then
。
you should not subscribe in your application
,为什么不呢? void 方法也适用于 doOnNext
,它应该只用于副作用。问题是他们预计副作用会影响原始 Mono 链。此外,doOnNext
的 AFAIK 没有方法签名允许您返回任何内容,因为它只需要一个消费者。
因为调用客户端失去了控制背压的能力。而且我从未声称 doOnNext 没有采用 void 方法,所以请在发表评论之前阅读。
您无法控制doOnNext
中的背压,因此不确定您的观点。你也说You are doing several things wrong ... you are having void methods
,但是 OP 在 doOnNext 中使用它们,那么这又是怎么回事?我读错了什么?以上是关于Flux 不会在“then”之前等待元素完成的主要内容,如果未能解决你的问题,请参考以下文章