在 Spring WebFlux 中使用 .repeatWhen() 基于 DB 资源重复订阅

Posted

技术标签:

【中文标题】在 Spring WebFlux 中使用 .repeatWhen() 基于 DB 资源重复订阅【英文标题】:Repeatedly subscribe based on DB resource with .repeatWhen() in Spring WebFlux 【发布时间】:2022-01-03 23:47:16 【问题描述】:

我想根据数据库中某些资源的状态来实现 Flux process() 的重复。例如,如果资源中的元素数组不为空,则重复 process()。看起来运营商 repeatWhen 将适合我的目的 - 允许订阅具有资源的发布者。这是一个代码sn-p:

private Consumer<Signal<String>> processOnNewThread() 
    return signal -> 
        final var resourceId = signal.get();
        if (resourceId == null) return;

        this.process(resourceId)
            .repeatWhen(repeat -> Mono.defer(() -> repo.findById(resourceId)
                                                       // filter to end repeat
                                                       .filter(r -> !r.getElems().isEmpty())
                                                       // return Mono with complete signal to repeat
                                                       .map(r -> r.getElems().size())))
            .collectList()
            .contextWrite(stateSignal.getContextView())
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    ;


private Flux<String> process(String resourceId)  ...  

这段代码有两个问题:

    repo.findById(resourceId) 在 process() 方法之前执行,尽管 Mono.defer() 当 elems 为空时,重复序列以空信号结束,这不会导致重复结束,而是整个过程结束

关于如何检查新资源然后继续或完成重复的任何想法?

【问题讨论】:

【参考方案1】:

我通过如下更改运算符 .repeatWhen 设法达到了预期的结果:

.repeatWhen(repeat -> repeat.flatMap(r -> Mono.defer(() -> repo.findById(resourceId)
                                                               .map(r -> r.getElems().size())))
                            .handle((nextRepeat, sink) -> 
                                // if elem size > 0 - repeat process
                                if (nextRepeat > 0) sink.next(nextRepeat);
                                else sink.complete();
                            ))

Flux repeat 用于进一步的链允许操作员Mono.defer() 正确执行并在每次重复检查时获得新资源。 handle() 操作符直接执行续订的延长或结束。因此,这解决了我遇到的问题

【讨论】:

以上是关于在 Spring WebFlux 中使用 .repeatWhen() 基于 DB 资源重复订阅的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Boot WebFlux 中使用 GET 请求注销

使用 Spring Webflux 返回元素列表

如何在 Spring webflux 应用程序中使用 Spring WebSessionIdResolver 和 Spring Security 5?

如何在 Spring Boot 和 Spring WebFlux 中使用“功能 bean 定义 Kotlin DSL”?

如何在 spring-webflux RouterFunction 端点中使用 OpenApi 注释?

使用 Spring boot + WebFlux 进行全局错误处理