如何在确保每个实体 FIFO 的同时并行处理消息?

Posted

技术标签:

【中文标题】如何在确保每个实体 FIFO 的同时并行处理消息?【英文标题】:How do you process messages in parallel while ensuring FIFO per entity? 【发布时间】:2011-08-06 12:48:17 【问题描述】:

假设您的系统中有一个实体,例如“Person”,并且您想要处理修改各种 Person 实体的事件。重要的是:

同一个人的事件按先进先出顺序处理 多个 Person 事件流由不同的线程/进程并行处理

我们有一个使用共享数据库和锁来解决这个问题的实现。线程竞争获取一个 Person 的锁,然后在获取锁后按顺序处理事件。我们希望移动到消息队列以避免轮询和锁定,我们认为这会减少数据库的负载并简化消费者代码的实现。

我对 ActiveMQ、RabbitMQ 和 HornetQ 进行了一些研究,但我没有看到明显的实现方法。

ActiveMQ 支持消费者订阅通配符,但我没有看到将每个队列上的并发限制为 1 的方法。如果我能做到这一点,那么解决方案将很简单:

以某种方式告诉代理允许所有以 /queue/person 开头的队列的并发数为 1。 发布者使用队列名称中的人员 ID 将事件写入队列。例如:/queue/person.20 消费者使用通配符订阅队列:/queue/person。> 每个消费者都会收到不同人员队列的消息。如果所有人员队列都在使用中,一些消费者可能会闲置,这没关系 处理完一条消息后,消费者发送一个 ACK​​,告诉代理它处理完该消息,并允许将该 Person 队列的另一条消息发送给另一个消费者(可能是同一个消费者)

ActiveMQ 接近了:您可以进行通配符订阅并启用“独占消费者”,但这种组合会导致单个消费者接收发送到所有匹配队列的所有消息,从而将所有人员的并发性降低到 1。我觉得我错过了一些明显的东西。

问题:

有没有办法通过任何主要的消息队列实现来实现上述方法?我们对选择持相当开放的态度。唯一的要求是它必须在 Linux 上运行。 是否有其他方法可以解决我没有考虑的一般问题?

谢谢!

【问题讨论】:

【参考方案1】:

如果您已经有一个允许共享锁的系统,为什么不为每个队列设置一个锁,消费者在从队列中读取之前必须获取该锁?

【讨论】:

使用非轮询队列协议(AMQP、STOMP),代理已经根据订阅规则将消息传递给消费者。消费者可以查询锁定表并等待它可用,但这会引入复杂性并降低并行性,因为消费者可能正在为另一个实体处理消息而不是等待锁定。 是的,但是消费者不需要等待锁——它可以传递到下一个队列。大概您的队列协议还允许消费者拒绝从队列中读取消息,然后他们可以测试锁。【参考方案2】:

解决此问题的一种通用方法(如果我的问题正确的话)是为 Person 引入一些独特的属性(例如,Person 的数据库级 id)并使用该属性的哈希作为 FIFO 队列的索引来放置它人在。 由于该属性的散列可能很大(您负担不起 2^32 个队列/线程),因此仅使用该散列的 N 个最低有效位。 每个 FIFO 队列都应该有专门的工作人员来处理它——瞧,你的要求已经满足了!

这种方法有一个缺点 - 您的 Person 必须具有分布良好的 id 才能使所有队列以或多或少相等的负载工作。如果您不能保证,请考虑使用轮询队列集并跟踪现在正在处理哪些人员,以确保同一人员的顺序处理。

【讨论】:

在您的解决方案中,消费者是否仍然进行通配符订阅?如果是这样,我如何确保每个队列只有一个活跃的消费者?或者您是说我将队列名称向下散列,以便 N == # 的活跃消费者? @James 很抱歉让您感到困惑,我谈到了通用解决方案,没有考虑 JMS。也就是说,只是线程和 JDK 集合。因此,在我的解决方案中,必须手动设置队列和消费者之间的映射——这通常很容易完成。 啊,我明白了。谢谢。【参考方案3】:

看起来 JMSXGroupID 是我正在寻找的。来自 ActiveMQ 文档:

http://activemq.apache.org/message-groups.html

他们的股票价格示例用例正是我所追求的。我唯一担心的是如果单个消费者死亡会发生什么。希望代理能够检测到这一点并选择另一个消费者与该组 ID 相关联。

【讨论】:

看起来这完全可行。如果有人感兴趣,这是一个我已经成功针对 ActiveMQ 和 HornetQ 测试过此类问题的 python 示例:gist.github.com/923228 From oracle JMS 1.1 especification 我找到了JMSXGroupID,看起来它是所有 JMS 实现所必需的。但它没有详细解释预期的行为,然后将取决于提供者

以上是关于如何在确保每个实体 FIFO 的同时并行处理消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用rabbitmq同步消费者

寻找正确的 mule 组件以按 fifo 顺序解复用消息

POSIX消息队列 - mq_send线程唤醒命令

并行代码扩展性差

EIP/Apache Camel - 如何同时处理消息,但每个组原子处理?

java多线程