如何根据条件限制并发消息消耗

Posted

技术标签:

【中文标题】如何根据条件限制并发消息消耗【英文标题】:How to limit concurrent message consuming based on a criteria 【发布时间】:2015-04-09 11:17:56 【问题描述】:

场景(我已经简化了):

许多最终用户可以从前端 Web 应用程序(生产者)开始工作(繁重的工作,例如渲染大 PDF)。 作业被发送到单个持久 RabbitMQ 队列。 许多工作应用程序(消费者)处理这些作业并将结果写回数据存储区。

这种相当标准的模式运行良好。

问题:如果用户在同一分钟内启动了 10 个作业,并且在一天中的那个时间只有 10 个工作应用程序启动,那么这个最终用户实际上是在为自己接管所有计算时间.

问题:如何确保每个最终用户在任何时候都只处理一个作业? (奖励:不得限制某些最终用户(例如管理员))

另外,我不希望前端应用程序阻止最终用户启动并发作业。我只希望最终用户一次完成一项并发作业。

解决方案?:我应该为每个最终用户动态创建一个自动删除独占队列吗?如果是,我如何告诉工作应用程序开始使用此队列?如何确保一个(并且只有一个)工作人员会从这个队列中消费?

【问题讨论】:

一名工人排队。所以你可以计算类似:userid % workercount,添加路由,如rabbitmq.com/tutorials/tutorial-five-dotnet.html。因此,一次只能处理一个用户的一项任务。 我发现这种方法存在三个问题:1)工作人员的数量必须是相对静态的,该算法才能正常工作,2)生产者需要实时知道有多少消费者上线,3)如果某些用户开始的工作比其他用户多,那么工作量可能不会在工作人员之间公平分配。谢谢,但我希望有人能帮助我了解如何设置我的 RabbitMQ 队列和交换来实现这一点(完全有可能:))。 如果我对您的理解正确,您需要一些能够自动创建和终止工作人员并平均分配任务的东西。尝试添加调度程序节点或节点取决于您有多少任务。调度程序将为用户添加队列,当工作人员结束所有用户任务时,它会向调度程序发送消息以删除队列。一名工作人员可以处理多个用户队列,但在您的情况下,只有一名工作人员可以处理一名用户(管理员除外)。 使用redis或zookeeper控制正在处理的并发用户数 Robinho,是否愿意将您的评论扩展为答案? 【参考方案1】:

正如 Dimos 所说,您需要自己构建一些东西来实现这一点。这是一个替代实现,它需要一个额外的队列和一些持久存储。

与现有的作业队列一样,创建一个“可处理的作业队列”。只有满足您的业务规则的作业才会添加到此队列中。 为作业队列创建一个使用者(名为“限制器”)。限制器还需要持久存储(例如 Redis 或关系数据库)来记录当前正在处理的作业。限制器从作业队列中读取并写入可处理的作业队列。

当工作应用程序完成处理作业时,它会将“作业完成”事件添加到作业队列中。

------------     ------------     ----------- 
| Producer | -> () job queue ) -> | Limiter | 
------------     ------------     ----------- 
                     ^                |                    
                     |                V                    
                     |     ------------------------       
                     |    () processable job queue )  
       job finished  |     ------------------------       
                     |                |
                     |                V
                     |     ------------------------
                     \-----| Job Processors (x10) |
                           ------------------------

限制器的逻辑如下:

收到作业消息后,检查持久存储以查看当前用户是否已在运行作业: 如果不是,则将存储中的作业记录为正在运行,并将作业消息添加到可处理作业队列中。 如果现有作业正在运行,请将存储中的作业记录为待处理作业。 如果作业是针对管理员用户的,请始终将其添加到可处理作业队列中。 当收到“作业完成”消息时,从持久存储中的“正在运行的作业”列表中删除该作业。然后检查该用户的待处理作业的存储: 如果找到作业,将该作业的状态从挂起更改为正在运行,并将其添加到可处理作业队列中。 否则,什么也不做。 一次只能运行一个限制器进程实例。这可以通过仅启动限制器进程的单个实例或通过在持久存储中使用锁定机制来实现。

它相当重量级,但如果您需要查看发生了什么,您可以随时检查持久存储。

【讨论】:

感谢您的回答。这个带有持久存储的Limiter 概念确实很重量级。我也不喜欢这个过程会以某种方式成为单点故障的事实。 您可以跨多个服务器启动多个Limiter进程实例,并依靠持久存储中的锁定机制来确保一次只能处理一个Limiter。这样,如果一台服务器出现故障,其他 Limiter 进程将继续处理。 确实,对不起,你是对的。一次只能运行一个限制器进程,但可以启动多个。【参考方案2】:

rabbitMQ 本身并没有提供这样的功能。 但是,您可以通过以下方式实现它。但是,您将不得不使用轮询,这不是那么有效(与订阅/发布相比)。您还必须利用 Zookeeper 来协调不同的工作人员。

您将创建 2 个队列:1 个高优先级队列(用于管理作业)和 1 个低优先级队列(用于普通用户作业)。这 10 个工作人员将从两个队列中检索消息。每个工作人员将执行一个无限循环(理想情况下,当队列为空时,会有睡眠间隔),它会尝试从每个队列中互换地检索消息:

对于高优先级队列,worker 只需检索一条消息,对其进行处理并向队列确认。 对于低优先级队列,worker 尝试在 Zookeeper 中持有锁(通过写入特定的文件-znode),如果成功,则读取一条消息,对其进行处理并确认。如果 zookeeper 写入不成功,则其他人持有锁,因此该 worker 跳过此步骤并重复循环。

【讨论】:

那么每个可能上传文件的最终用户都有一个 Zookeeper 锁?如果低优先级队列上的第一条消息是给已经在处理某些东西的用户的,那么没有工作人员将能够处理该队列上的任何消息?还是我误会了? 我的问题已经有几个月了。我现在已经切换到 SQS 而不是 RabbitMQ。但是我最初的问题仍然存在,我的基于 SQS 的实现仍然存在这个问题。我可能会尝试实施您在此答案中提出的建议。由于我已经使用轮询 SQS,所以这部分没问题。但是我不想在我的堆栈中添加其他组件,所以我不会使用 Zookeeper,而是使用 Redis(我已经将它用于缓存和计数器)。无论如何,谢谢你的回答。我接受它。 @WW。低优先级队列的所有消息都有一个(全局)锁。 10 个工作进程中的每一个都应首先获取此锁,然后再从队列中读取消息。此外,每个工作人员必须在处理完消息后释放锁。本质上,消息的 read-process-ack 处理正在以这种方式转换为关键部分,因此最多一个工作人员可以一直执行它。 @Pierre-DavidBelange,尽管如此,轮询不是必需的。您可以使用 pub/sub 来实现它,但队列必须通知每个新成员的所有工作人员,并且只有第一个获得锁的工作人员才会使用它。 Redis 也可以成为 Zookeeper 的同等替代品,因为您可以使用乐观的locking & watches 来创建高度可扩展的解决方案。

以上是关于如何根据条件限制并发消息消耗的主要内容,如果未能解决你的问题,请参考以下文章

如何设置servlet中并发请求数的限制?

根据半径限制引脚数量?

LC2468.根据限制分割消息(枚举&模拟)

LC2468.根据限制分割消息(枚举&模拟)

MassTransit:在消费者消费完所有消息后如何停止公共汽车?

nginx 根据条件限制ip访问