RabbitMQ 后续消息原子性
Posted
技术标签:
【中文标题】RabbitMQ 后续消息原子性【英文标题】:RabbitMQ subsequent message atomicity 【发布时间】:2016-06-07 20:42:20 【问题描述】:我有一个 RabbitMQ 消息结构,其中一条消息A
应该会生成许多消息,我们称它们为B
和C
。一个消息A
被一个工作进程接收,然后处理它并生成消息B
和C
。
假设的工作流程如下:
-
接收消息
A
和ack=False
开始交易
运行一些代码
生成消息B
生成消息C
发送ack
获取消息A
完成交易
在任何情况下,工作进程在处理消息 A
期间死亡,或者尚未完成事务 - 我希望 RabbitMQ 将消息 A
视为未传递并重新排队。
如果相关,RabbitMQ 在高可用配置中运行。
为什么要尝试清除 RabbitMQ 文档 here,说明:
我有什么方法可以在 RabbitMQ 或市场上任何其他排队软件的环境中实现我想要的行为? 有什么方法可以让 RabbitMQ 用于多个队列?此外,RabbitMQ 不提供原子性保证,即使在仅涉及单个队列的事务的情况下,例如tx.commit 期间的错误可能导致代理重新启动后出现在队列中的事务发布子集。
【问题讨论】:
【参考方案1】:在我看来,我的目标是最受欢迎的产品是个错误。 ActiveMQ 完全支持高可用性和事务,所以我刚刚切换到它,我的问题就消失了。
【讨论】:
【参考方案2】:您将需要相当多的基础架构(代码)才能使其正常工作。这是相当棘手的。
您可能需要考虑使用服务总线。我在这里有一个免费的.net 开源:http://shuttle.github.io/shuttle-esb/
如果您不在 .net 领域或对使用服务总线不感兴趣,您可以按照一些代码查看 Shuttle.Esb 是如何处理其中一些事情的:https://github.com/Shuttle/Shuttle.Esb
您可能还想在这里查看IdempotenceService
:https://github.com/Shuttle/Shuttle.Esb.SqlServer/blob/master/Shuttle.ESB.SqlServer/Idempotence/IdempotenceService.cs
使用幂等服务器(在处理程序中)时,所有发送的消息都存储在事务存储中(如本例中的 sql 服务器),并且仅在消息处理成功完成后发送。
对于在消息处理程序之外发送,可以使用事务发件箱(同样,基于 sql server 表的队列也可以)。
关键是你需要认真考虑你的设计,这会很棘手。
【讨论】:
我确切地知道如何实现这一点 - SQL 支持的解决方案将是一个天真的解决方案。我的问题是找到能够在 RabbitMQ 性能级别而不是 Postgres/mysql 上有效执行此操作的正确软件。【参考方案3】:我们有一些系统在生产中的工作方式与此完全一样,基本格式为:
-
获取消息 A
工作
确认消息 A
如果第 2 步失败,消息 A 将“重新排队”并且可以再次获取。
当消息被发送到队列并成功接收时,该消息被标记为ready
。当它被发送给消费者时,它会被标记为unacked
,直到被消费者确认后,它才会从队列中移除。
在您的示例中,直到您在步骤 #6 中确认消息之前,它仍处于 unacked
状态,这意味着如果工作进程在步骤 #6 之前死亡,则消息将滑回 ready
状态在 RabbitMQ,准备发送给另一个worker。
此功能确实依赖于您没有明确禁用确认这一事实。如果是这样,我很确定消息在发送给消费者后会立即从队列中删除。
【讨论】:
如果您希望在消息 A 之后发送后续消息 B,该怎么办?显然,以原子方式 - “确认并发布消息 B”或“noack 和消息 B 从未发布过”? 在这种情况下,您将在“Do work”阶段发送消息 B,因此如果消息 A 得到确认,则您知道消息 B 已发送,反之亦然反之亦然。 如果可以发送消息,但无法完成工作怎么办? 那么这更多取决于您如何发送和处理消息。就个人而言,在使用消息队列时,我会确保我编写的消费者只执行一项工作,因此如果消息已被处理,那么我知道 所有 的工作已经完成。因此,在您的情况下,如果您想同时发送消息 B 和 做一些工作,请创建两个 不同的 消费者,一个会发送消息,一个会工作。无论使用哪种排队系统,执行多阶段任务都是有风险的,这也是为什么idempotence 也是如此重要的因素。以上是关于RabbitMQ 后续消息原子性的主要内容,如果未能解决你的问题,请参考以下文章