Spring & Reactor:使用来自 repo 的 Flux 订阅两个参数的二进制函数

Posted

技术标签:

【中文标题】Spring & Reactor:使用来自 repo 的 Flux 订阅两个参数的二进制函数【英文标题】:Spring & Reactor: Subscribe to a binary function of two arguments with Flux from repo 【发布时间】:2021-07-23 06:49:29 【问题描述】:

假设我有一个定义如下的二元函数:

Mono<ObjectA> someBinaryFunction(String someString, enum someEnum)

并且我想在一些操作后使用 Spring 存储库中的通量来订阅此功能:

    repo.findAll()
            .filter( item -> itemField < someValue)
            .flatMapSequential( item ->
              Flux.just(item.getValue1(), enum.ENUMVALUE)
              ).subscribe(baseClass::someBinaryFunction)

如何实现这一点,使用 Flux 订阅具有 N 个参数的函数的最佳实践是什么?我的第一个直觉是像上面那样使用sequentialFlatMap,但这失败了。

【问题讨论】:

【参考方案1】:

如果最初的意图是将项目的通量转换为像 (item, enumVal) 这样的对的通量,则解决方案可能如下:

repo.findAll()
        .filter( item -> itemField < someValue)
        .flatMapSequential( item ->
          Flux.just(Tuples.of(item.getValue1(), enum.ENUMVALUE))
          ).subscribe(tuple->baseClass.someBinaryFunction(tuple.getT1(), tuple.getT2()))

Tuples.of(...) 方法创建 Tuple2 类型对,然后创建 tuple.getT1() strong> 和 tuple.getT2() 提取这对的第一个和第二个成员,以作为 someBinaryFunction() 参数使用。

【讨论】:

虽然此代码可能会回答问题,但添加额外的解释可能会对 OP 和未来的读者有所帮助。

以上是关于Spring & Reactor:使用来自 repo 的 Flux 订阅两个参数的二进制函数的主要内容,如果未能解决你的问题,请参考以下文章

使用 reactor netty 为 spring-webflux WebClient 配置 HostnameVerifier

(12)Reactor 3 自定义数据流——响应式Spring的道法术器

(14)Reactor调度器与线程模型——响应式Spring的道法术器

使用 Reactor Netty 配置 Spring Boot 以监听 2 个端口

Spring WebFlux 使用 RSocket:Kotlin 协程 Flow 与 Reactor Flux 消息格式

基于spring reactor3构建的即时通讯api。