Spring Cloud Stream 如何防止应用程序的实例接收重复消息?

Posted

技术标签:

【中文标题】Spring Cloud Stream 如何防止应用程序的实例接收重复消息?【英文标题】:How Spring Cloud Stream prevents the application’s instances from receiving duplicate messages? 【发布时间】:2019-08-05 03:21:53 【问题描述】:

Spring Cloud Stream基于至少一次方法,这意味着在极少数情况下重复消息可以到达端点.

Spring Cloud Stream 是否保留已接收消息的缓冲区?

企业集成模式一书中的IdempotentReceiver 建议: 将接收器设计为幂等接收器,它可以安全地多次接收相同的消息。

Spring Cloud Stream 是否控制消费者中的重复消息?

更新:

Spring Cloud Stream 的一段话说:

4.5.1。耐用性 与 Spring Cloud Stream 自以为是的应用模型一致,消费者组订阅是持久的。也就是说,绑定器实现确保组订阅是持久的,并且一旦创建了组的至少一个订阅,该组就会接收消息,即使它们是在组中的所有应用程序停止时发送的。 匿名订阅本质上是不持久的。对于某些 binder 实现(例如 RabbitMQ),可能会有非持久组订阅。 通常,在将应用程序绑定到给定目标时,最好始终指定消费者组。在扩展 Spring Cloud Stream 应用程序时,您必须为其每个输入绑定指定一个使用者组。 这样做可以防止应用程序的实例接收到重复的消息(除非需要这种行为,这是不寻常的)。

【问题讨论】:

上述段落 (4.5.1) 适用于“放大”应用程序。当有多个实例从同一目的地消费时,每个实例都会收到所有消息,从而导致重复处理,除非应用程序指定了消费者组。如果为应用指定了一个组,则一个实例会接收每条消息。 谢谢@dturanski。 【参考方案1】:

如果您担心应用程序从代理接收并处理消息但在确认消息之前崩溃的情况,则可能会发生这种情况。 Spring 云流应用程序启动器支持自动配置持久化message metadata store,它支持 Spring Integration 的IdempotentReceiverInterceptor。 SFTP source app starter 就是一个例子。默认情况下,sftp 源使用内存中的元数据存储,因此它不会在重新启动后继续存在,但可以自定义为使用持久存储。

【讨论】:

非常感谢。我会检查您的解决方案。【参考方案2】:

我认为您对 spring-cloud-stream 框架的责任的假设是不正确的。 简而言之,Spring-cloud-stream 是一个框架,负责将开发人员提供的 生产者/消费者 连接到 spring-cloud-stream 绑定器(例如,Kafka)暴露的消息代理。 、兔子、Kinesis 等)。 因此,连接到代理、从代理接收消息、对其进行反序列化、调用用户代码、序列化消息并将其发送回代理属于框架责任范围。因此,您可以将其视为纯粹的基础设施。

您所描述的更多是应用程序问题,因为实际接收器是用户将作为 spring-cloud-stream 开发体验的一部分开发的东西,因此幂等性的责任将由此类用户承担。 此外,最重要的是,大多数代理已经通过确保特定消息仅传递一次来处理幂等性(在某种程度上)。也就是说,如果有人向这样的代理发送相同的消息,它将不知道它是重复的,因此幂等性和/或重复数据删除的要求仍然有效,但正如您所看到的,考虑到因素的数量,它并不那么简单你对幂等性的理解可能与我的不同,因此我们的方法也可能不同。 最后一件事(部分证明了我的最后一点):可以安全地多次接收相同的消息。 - 这就是它的全部内容,但 安全地 对你真正意味着什么vs. 我 vs. 其他人?

【讨论】:

感谢 Oleg。我用文档中的一段更新了问题。 我认为部分文档需要审核。

以上是关于Spring Cloud Stream 如何防止应用程序的实例接收重复消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何在spring cloud stream和kafka中从同一主题发送和接收

如何构建 Spring Cloud Stream JMS ActiveMQ

Spring Cloud Stream GCP如何重新排队失败的消息

spring-cloud-stream kafka 消费者并发

Spring Cloud Stream如何消费自己生产的消息

Spring Cloud Stream 验证