如何在任何 MQ 平台中实现这个单一并发分布式队列?

Posted

技术标签:

【中文标题】如何在任何 MQ 平台中实现这个单一并发分布式队列?【英文标题】:How can I implement this single concurrency distributed queue in any MQ platform? 【发布时间】:2017-06-18 04:19:02 【问题描述】:

我目前正在努力寻找实现特定类型队列的解决方案,这需要以下特征:

    所有队列都必须遵守添加作业的顺序。 整个队列的并发数为 1,这意味着每个 队列 一次只能执行一个作业,而不是工作线程。 这样的队列将超过几千个。 它需要分布式并且能够扩展(例如,如果我添加了一个工人)

基本上它是一个单进程 FIFO 队列,这正是我在试用 ActiveMQ 或 RabbitMQ 等不同的消息队列软件时想要的,但是一旦我将它扩展到 2 个工作线程,它就不起作用,因为在这种情况下我希望它能够扩展并保持与单个进程队列完全相同的功能。下面我附上它应该如何在具有多个工作人员的分布式环境中工作的描述。

拓扑结构示例:(请注意,QueueWorkers 之间是多对多关系)

如何运行的示例:

+------+-----------------+-----------------+-----------------+
| Step | Worker 1        | Worker 2        | Worker 3        |
+------+-----------------+-----------------+-----------------+
| 1    | Fetch Q/1/Job/1 | Fetch Q/2/Job/1 | Waiting         |
+------+-----------------+-----------------+-----------------+
| 2    | Running         | Running         | Waiting         |
+------+-----------------+-----------------+-----------------+
| 3    | Running         | Done Q/2/Job/1  | Fetch Q/2/Job/2 |
+------+-----------------+-----------------+-----------------+
| 4    | Done Q/1/Job/1  | Fetch Q/1/Job/2 | Running         |
+------+-----------------+-----------------+-----------------+
| 5    | Waiting         | Running         | Running         |
+------+-----------------+-----------------+-----------------+

这可能不是最好的表示,但它表明,即使在 Queue 1Queue 2 中,也有更多的工作,但 Worker 3 确实如此直到前一个作业完成后才开始获取下一个作业。

这是我努力寻找好的解决方案。

我已经尝试了很多其他解决方案,例如rabbitMQ、activeMQ、apollo...这些允许我创建数千个队列,但在我尝试的时候,所有这些都将使用 worker 3 来运行队列中的下一个作业。并发是每个工人

是否有任何解决方案可以在任何 MQ 平台上实现这一点,例如 ActiveMQ、RabbitMQ、ZeroMQ 等?

谢谢你:)

【问题讨论】:

虽然这个话题确实很有趣。在 Stack Overflow 上请求场外资源是题外话。 异地资源是什么意思?如果需要,我会改写这个问题。我想问的更像是一种实现方式,而不是严格寻找程序/解决方案。 【参考方案1】:

您可以使用 Redis 列表和一个额外的“调度”队列来实现这一点,所有工作人员 BRPOP 都在为他们的工作启用该队列。调度队列中的每个作业都使用原始队列 ID 进行标记,当工作人员完成作业时,它会进入这个原始队列并在调度队列上执行RPOPLPUSH 以使下一个作业可用于任何其他工作人员。因此,调度队列最多有 num_queues 个元素。

您必须处理的一件事是当源队列为空时调度队列的初始填充。这可能只是发布者针对最初设置的每个队列的“空”标志进行的检查,并且当原始队列中没有任何东西要分派时也由工作人员设置。如果设置了这个标志,发布者可以直接LPUSH第一个作业进入调度队列。

【讨论】:

以上是关于如何在任何 MQ 平台中实现这个单一并发分布式队列?的主要内容,如果未能解决你的问题,请参考以下文章

消息队列——ActiveMQ

高并发架构系列:MQ消息队列的12点核心原理总结

如何在 QML 中实现对象之间的单一连接?

[Java] 分布式消息队列(MQ)

如何在MQ中实现支持任意延迟的消息?

程序员之消息队列