RabbitMQ 后续消息原子性

Posted

技术标签:

【中文标题】RabbitMQ 后续消息原子性【英文标题】:RabbitMQ subsequent message atomicity 【发布时间】:2016-06-07 20:42:20 【问题描述】:

我有一个 RabbitMQ 消息结构,其中一条消息A 应该会生成许多消息,我们称它们为BC。一个消息A被一个工作进程接收,然后处理它并生成消息BC

假设的工作流程如下:

    接收消息Aack=False 开始交易 运行一些代码 生成消息B 生成消息C 发送ack 获取消息A 完成交易

在任何情况下,工作进程在处理消息 A 期间死亡,或者尚未完成事务 - 我希望 RabbitMQ 将消息 A 视为未传递并重新排队。

如果相关,RabbitMQ 在高可用配置中运行。

为什么要尝试清除 RabbitMQ 文档 here,说明:

此外,RabbitMQ 不提供原子性保证,即使在仅涉及单个队列的事务的情况下,例如tx.commit 期间的错误可能导致代理重新启动后出现在队列中的事务发布子集。

我有什么方法可以在 RabbitMQ 或市场上任何其他排队软件的环境中实现我想要的行为? 有什么方法可以让 RabbitMQ 用于多个队列?

【问题讨论】:

【参考方案1】:

在我看来,我的目标是最受欢迎的产品是个错误。 ActiveMQ 完全支持高可用性和事务,所以我刚刚切换到它,我的问题就消失了。

【讨论】:

【参考方案2】:

您将需要相当多的基础架构(代码)才能使其正常工作。这是相当棘手的。

您可能需要考虑使用服务总线。我在这里有一个免费的.net 开源:http://shuttle.github.io/shuttle-esb/

如果您不在 .net 领域或对使用服务总线不感兴趣,您可以按照一些代码查看 Shuttle.Esb 是如何处理其中一些事情的:https://github.com/Shuttle/Shuttle.Esb

您可能还想在这里查看IdempotenceService:https://github.com/Shuttle/Shuttle.Esb.SqlServer/blob/master/Shuttle.ESB.SqlServer/Idempotence/IdempotenceService.cs

使用幂等服务器(在处理程序中)时,所有发送的消息都存储在事务存储中(如本例中的 sql 服务器),并且仅在消息处理成功完成后发送。

对于在消息处理程序之外发送,可以使用事务发件箱(同样,基于 sql server 表的队列也可以)。

关键是你需要认真考虑你的设计,这会很棘手。

【讨论】:

我确切地知道如何实现这一点 - SQL 支持的解决方案将是一个天真的解决方案。我的问题是找到能够在 RabbitMQ 性能级别而不是 Postgres/mysql 上有效执行此操作的正确软件。【参考方案3】:

我们有一些系统在生产中的工作方式与此完全一样,基本格式为:

    获取消息 A 工作 确认消息 A

如果第 2 步失败,消息 A 将“重新排队”并且可以再次获取。

当消息被发送到队列并成功接收时,该消息被标记为ready。当它被发送给消费者时,它会被标记为unacked,直到被消费者确认后,它才会从队列中移除。

在您的示例中,直到您在步骤 #6 中确认消息之前,它仍处于 unacked 状态,这意味着如果工作进程在步骤 #6 之前死亡,则消息将滑回 ready 状态在 RabbitMQ,准备发送给另一个worker。

此功能确实依赖于您没有明确禁用确认这一事实。如果是这样,我很确定消息在发送给消费者后会立即从队列中删除。

【讨论】:

如果您希望在消息 A 之后发送后续消息 B,该怎么办?显然,以原子方式 - “确认并发布消息 B”或“noack 和消息 B 从未发布过”? 在这种情况下,您将在“Do work”阶段发送消息 B,因此如果消息 A 得到确认,则您知道消息 B 已发送,反之亦然反之亦然。 如果可以发送消息,但无法完成工作怎么办? 那么这更多取决于您如何发送和处理消息。就个人而言,在使用消息队列时,我会确保我编写的消费者只执行一项工作,因此如果消息已被处理,那么我知道 所有 的工作已经完成。因此,在您的情况下,如果您想同时发送消息 B 做一些工作,请创建两个 不同的 消费者,一个会发送消息,一个会工作。无论使用哪种排队系统,执行多阶段任务都是有风险的,这也是为什么idempotence 也是如此重要的因素。

以上是关于RabbitMQ 后续消息原子性的主要内容,如果未能解决你的问题,请参考以下文章

消息未读数分布式锁和原子性

Java内存模型

Redis事务

什么是程序的原子性

2. 原子性 Atomic

Redis如何保证原子性