发件箱模式 - 我们如何防止消息中继进程生成重复消息?
Posted
技术标签:
【中文标题】发件箱模式 - 我们如何防止消息中继进程生成重复消息?【英文标题】:Outbox Pattern - How can we prevent the Message Relay process from generating duplicated messages? 【发布时间】:2019-10-25 19:43:06 【问题描述】:实现outbox pattern 的常用方法是将消息负载存储在发件箱表中,并有一个单独的进程(消息中继)查询未决消息并将它们发布到消息代理,就我而言是卡夫卡。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它会定期 (@Scheduled
) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。
第一个问题是:如果我启动多个消息中继实例,所有这些实例都会查询发件箱表,并且可能在某些时候不同的实例会获得相同的 PENDING 注册表以发布到 Kafka ,生成重复的消息。我怎样才能防止这种情况发生?
另一种情况:假设只有一个消息中继。它获取一条 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。
【问题讨论】:
您将发件箱存储在哪种数据库上? 数据库是 MariaDB 为什么不使用Kafka-Connect来控制发送的事件? debezium.io/blog/2019/02/19/… 在类似的情况下调度程序应该运行多长时间,应该几乎是实时的? 【参考方案1】:为了防止第一个问题,您必须使用数据库锁定。
SELECT * FROM outbox WHERE id = 1 FOR UPDATE
这将阻止其他进程访问同一行。
您无法解决的第二个问题,因为您没有使用 Kafka 进行分布式事务。
因此,一种方法是在将记录发送到 Kafka 之前将其设置为 PROCESSING 等状态,如果应用程序崩溃,您应该检查是否有处于 PROCESSING 状态的记录并执行一些清理任务以查明它们是否已经存在发送给卡夫卡。
但最好的解决方案是拥有一个可以处理重复项的幂等系统。
【讨论】:
消费者可以保留一个消息日志,通过messageId检查之前是否有相同的消息到达(如果不能是幂等的)【参考方案2】:您可以使用debezium
(https://debezium.io/) 来读取 SQL 服务器的 bin-log 并将事件写入 Kafka。它将解决您的两个问题。
【讨论】:
【参考方案3】:对于第一个问题,您可以使用ShedLock library。它确保在任何时候,只有一个服务实例在执行计划任务。
对于第二个问题,是的,您必须开发幂等消费者。您可以通过将消息 id 传递给消费者来做到这一点,并维护一个表来检查具有消息 id 的消息是否已被处理,只需忽略它。
【讨论】:
以上是关于发件箱模式 - 我们如何防止消息中继进程生成重复消息?的主要内容,如果未能解决你的问题,请参考以下文章
Debezium 发件箱模式 |如果我们使用 debezium,架构是用 SMT/发件箱表固定的吗
使用 Debezium 的 Quarkus 发件箱模式:如何将自定义列添加到发件箱表