Flux 不会在“then”之前等待元素完成

Posted

技术标签:

【中文标题】Flux 不会在“then”之前等待元素完成【英文标题】:Flux does not wait for completion of elements before 'then' 【发布时间】:2020-10-16 20:50:40 【问题描述】:

我无法理解这个问题,我不确定我做错了什么

我想等待flux结束,然后返回serverResponse的mono

我已附上代码 sn-p,doOnNext 将填充 categoryIdToPrintRepository

我查看了如何在通量结束后返回单声道,发现“then”但仍然在处理 onNextSite 之前执行“then”方法,这导致错误:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry

我做错了什么?

 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) 
        return Mono.just("start").flatMap(id ->
                Flux.fromIterable(appSettings.getSites())
                        .subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
                        .doOnNext(this::onNextSite)
                        .then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

    

    private void onNextSite(Integer siteId) 
        IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> 
            Optional<SiteCatalogCategoryDTO> cacheData =
                    siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
            cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> /*do nothing already exist in cache*/,
                    () -> 
                    Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
                            .get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
                    catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
                            handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
            );
        );
    


    private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
                                          int siteId, int catalogId) 
        if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
            Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
                    .subscribe(mapSCC -> 
                        categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
                                "Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
                                        mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
                        siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
                    );
    

【问题讨论】:

【参考方案1】:

你做错了几件事,你不应该在你的应用程序中subscribe,并且你有 void 方法,除非在特定的地方,否则不应该在反应式编程中使用。

这里是一些示例代码:


// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() 
    Mono.just("Foo");


// complier error
doSomething().subscribe( ... );

您的应用程序是调用客户端的publisher,是订阅者,这就是为什么我们向调用客户端返回 Mono 或 Flux,它们是 subscribe

你已经这样解决了:

private void doSomething() 
    Mono.just("Foo").subscribe( ... );


doSomething();

现在您正在订阅自己以使事情运行,这不是正确的方式,如前所述,调用客户端是订阅者,而不是您。

正确方法:

private Mono<String> doSomething() 
    return Mono.just("Foo");


// This is returned out to the calling client, they subscribe
return doSomething();

当 Mono/Flux 完成时,它会发出一个信号,这个信号将触发链中的下一个和下一个。

所以我对你需要做的事情的看法如下:

删除all subscribes,如果你想做一些事情,有flatmapmapdoOnSuccess等函数。保持链一直保持不变客户。 删除所有 void 函数,确保它们返回 FluxMono,如果你想返回一些东西,使用返回 Mono&lt;Void&gt; Mono.empty() 函数,这样链就完成了。

一旦您使用 Mono/Flux,您需要处理返回,以便其他人可以链接。

更新:

为了触发then,你必须返回一些东西,它会在之前的单声道/通量完成时返回。

示例:

private Flux<String> doSomething() 
    return Flux.just("Foo", "Bar", "FooBar")
               .doOnNext(string -> 
                   return // return something
               );


// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );

【讨论】:

我理解你的remakrs,但问题仍然存在,我不想返回通量,我想完成通量执行然后返回一个单声道作为对客户端的响应,如何这可以在 spring webflux 中完成吗? 阅读then(Mono&lt;V&gt; other) 的文档,它声明Let this Flux complete then play signals from a provided Mono,这意味着如果您在通量完成时返回Flux(无论您希望它做什么),它将返回您放入的内容你的then you should not subscribe in your application,为什么不呢? void 方法也适用于 doOnNext,它应该只用于副作用。问题是他们预计副作用会影响原始 Mono 链。此外,doOnNext 的 AFAIK 没有方法签名允许您返回任何内容,因为它只需要一个消费者。 因为调用客户端失去了控制背压的能力。而且我从未声称 doOnNext 没有采用 void 方法,所以请在发表评论之前阅读。 您无法控制doOnNext 中的背压,因此不确定您的观点。你也说You are doing several things wrong ... you are having void methods,但是 OP 在 doOnNext 中使用它们,那么这又是怎么回事?我读错了什么?

以上是关于Flux 不会在“then”之前等待元素完成的主要内容,如果未能解决你的问题,请参考以下文章

等待最后一个元素时阻塞 Flux

淘汰赛:在等待查询完成时加载页面

在将焦点转移到某个元素之前,如何等待公告完成?

在执行下一行代码之前等待超时完成

Javascript for 循环不等待 .then 完成

Ember 2,验收测试,websocket 挂起和 Then() 等待挂起完成