RabbitMQ 消费者中的并发消息处理
Posted
技术标签:
【中文标题】RabbitMQ 消费者中的并发消息处理【英文标题】:Concurrent message processing in RabbitMQ consumer 【发布时间】:2015-10-03 05:09:11 【问题描述】:我是 RabbitMQ 的新手,所以如果我的问题听起来微不足道,请原谅。我想在 RabbitMQ 上发布消息,该消息将由 RabbitMQ 消费者处理。
我的消费机器是多核机器(最好是 azure 上的工人角色)。但是 QueueBasicConsumer 一次推送一条消息。如何编程以利用可以同时处理多条消息的所有核心。
-
一种解决方案可能是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程数。
另一种方法是在主线程上读取消息,然后创建任务并将消息传递给该任务。在这种情况下,如果有很多消息(在阈值之后)已经在进行中,我将不得不停止使用消息。不确定如何实现。
提前致谢
【问题讨论】:
这个帖子可能有帮助:***.com/questions/18531072/… 【参考方案1】:您的第二个选项听起来更合理 - 在单个通道上消费并产生多个任务来处理消息。要实现并发控制,您可以使用semaphore 来控制正在运行的任务数。在开始任务之前,您会等待信号量变为可用,并且在任务完成后,它会向信号量发出信号以允许其他任务运行。
您尚未指定选择的语言/技术堆栈,但无论您做什么 - 尝试利用线程池,而不是自己创建和管理线程。在 .NET 中,这意味着使用 Task.Run
异步处理消息。
示例 C# 代码:
using (var semaphore = new SemaphoreSlim(MaxMessages))
while (true)
var args = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
semaphore.Wait();
Task.Run(() => ProcessMessage(args))
.ContinueWith(() => semaphore.Release());
与其自己控制并发级别,您可能会发现在通道上启用显式 ACK 控制更容易,并使用RabbitMQ Consumer Prefetch 设置未确认消息的最大数量。这样一来,您就不会一次收到比您想要的更多的消息。
【讨论】:
以上是关于RabbitMQ 消费者中的并发消息处理的主要内容,如果未能解决你的问题,请参考以下文章