如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?
Posted
技术标签:
【中文标题】如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?【英文标题】:How to make Spring Cloud Stream consumer in Webflux application? 【发布时间】:2021-03-26 06:39:57 【问题描述】:我有一个基于 Webflux 的微服务,它有一个简单的响应式存储库:
public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId>
现在我想扩展这个微服务来使用来自 Kafka 的事件消息。然后此消息/事件将保存到数据库中。
对于 Kafka 监听器,我使用了 Spring Cloud Stream。我创建了一些简单的 Consumer,它运行良好 - 我能够使用消息并将其保存到数据库中。
@Bean
public Consumer<KStream<String, Event>> documents(NotificationRepository repository)
return input ->
input.foreach((key, value) ->
LOG.info("Received event, Key: , value: ", key, value);
repository.save(initNotification(value)).subscribe();
);
但这是连接 Spring Cloud Stream 消费者和响应式存储库的正确方法吗?当我最后不得不打电话给subscribe()
时,它看起来不像。
我读了Spring Cloud Stream documentation (for 3.0.0 release),他们说
Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.
在this presentation video 中,他们提到他们使用项目反应器支持反应式编程。所以我想有一种我不知道的方法。你能告诉我怎么做吗?
如果这一切听起来太愚蠢,我深表歉意,但我对 Spring Cloud Stream 和反应式编程非常陌生,还没有找到很多描述这一点的文章。
【问题讨论】:
【参考方案1】:只需使用 Flux 作为消费类型,如下所示:
@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository)
return input ->
input
.map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
.concatMap((value) -> repository.save(initNotification(value)))
.subscribe();
如果您使用带有空返回类型 (Function<Flux<Message<Event>>, Mono<Void>>
) 的 Function
而不是 Consumer,则框架可以自动订阅。对于Consumer
,您必须手动订阅,因为框架没有对流的引用。但是在Consumer
的情况下,您订阅的不是存储库而是整个流,这没关系。
【讨论】:
感谢您的回复,这正是我所需要的。但我试过了,不幸的是它没有用,消息被消耗但从未保存到数据库中。你不是说用 flatMap 代替 doOnNext 吗? 对不起。 concatMap 或 flatMap 你需要什么,我修改答案以上是关于如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 spring boot webflux 上从 mono<user> 获取用户名?
SpringBoot WebFlux - 制作并行 WebClient 请求
如何在 Spring Boot WebFlux 中使用 GET 请求注销