如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?

Posted

技术标签:

【中文标题】如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?【英文标题】:How to make Spring Cloud Stream consumer in Webflux application? 【发布时间】:2021-03-26 06:39:57 【问题描述】:

我有一个基于 Webflux 的微服务,它有一个简单的响应式存储库:

    public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> 
    

现在我想扩展这个微服务来使用来自 Kafka 的事件消息。然后此消息/事件将保存到数据库中。

对于 Kafka 监听器,我使用了 Spring Cloud Stream。我创建了一些简单的 Consumer,它运行良好 - 我能够使用消息并将其保存到数据库中。

    @Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) 
        return input ->
                input.foreach((key, value) -> 
                    LOG.info("Received event, Key: , value: ", key, value);
                    repository.save(initNotification(value)).subscribe();
                );
    

但这是连接 Spring Cloud Stream 消费者和响应式存储库的正确方法吗?当我最后不得不打电话给subscribe()时,它看起来不像。

我读了Spring Cloud Stream documentation (for 3.0.0 release),他们说

Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.

在this presentation video 中,他们提到他们使用项目反应器支持反应式编程。所以我想有一种我不知道的方法。你能告诉我怎么做吗?

如果这一切听起来太愚蠢,我深表歉意,但我对 Spring Cloud Stream 和反应式编程非常陌生,还没有找到很多描述这一点的文章。

【问题讨论】:

【参考方案1】:

只需使用 Flux 作为消费类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) 
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();

如果您使用带有空返回类型 (Function&lt;Flux&lt;Message&lt;Event&gt;&gt;, Mono&lt;Void&gt;&gt;) 的 Function 而不是 Consumer,则框架可以自动订阅。对于Consumer,您必须手动订阅,因为框架没有对流的引用。但是在Consumer 的情况下,您订阅的不是存储库而是整个流,这没关系。

【讨论】:

感谢您的回复,这正是我所需要的。但我试过了,不幸的是它没有用,消息被消耗但从未保存到数据库中。你不是说用 flatMap 代替 doOnNext 吗? 对不起。 concatMap 或 flatMap 你需要什么,我修改答案

以上是关于如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spring boot webflux 上从 mono<user> 获取用户名?

SpringBoot WebFlux - 制作并行 WebClient 请求

如何在 Spring Boot WebFlux 中使用 GET 请求注销

如何使用 Webflux 在 Spring API 处理程序方法中访问 JWT 声明?

Webflux WebClient中如何自定义异常?

如何在 OAuth2 WebFlux 中设置成功和失败处理程序