如何防止 Azure webjob 同时多次处理相同的消息

Posted

技术标签:

【中文标题】如何防止 Azure webjob 同时多次处理相同的消息【英文标题】:How to prevent Azure webjob processing same message multiple times concurrently 【发布时间】:2016-12-09 02:46:26 【问题描述】:

我有一个 Azure WebJob 项目,我在我的开发机器上本地运行。它正在侦听 Azure 服务总线消息队列。没有什么比主题更重要的了,只是最基本的消息队列。

它多次接收/处理同一条消息,收到消息后立即启动两次,然后在处理消息时间歇性启动。

问题:

为什么我会立即多次收到相同的消息?似乎是在应用 PeekLock 之前重新获取? 为什么邮件仍在处理中却被重新接收?我可以设置 PeekLock 持续时间,还是以某种方式将消息锁定为只处理一次 如何确保队列中的每条消息只处理一次? 我希望能够一次处理多条消息,而不是多次处理同一条消息,因此将 MaxConcurrentCalls 设置为 1 似乎不是我的答案,还是我误解了该属性?

我使用的是异步函数、简单注入器和自定义 JobActivator,所以我的函数签名不是静态 void 方法,而是:

public async Task ProcessQueueMessage([ServiceBusTrigger("AnyQueue")] MediaEncoderQueueItem message, TextWriter log) ...

在作业内部,它在 blob 服务上移动一些文件,并从媒体服务调用(并等待)媒体编码器。因此,虽然 Web 作业本身并没有进行大量处理,但它需要相当长的时间(对于某些文件,需要 15 分钟)。

应用程序正在启动,当我向队列发布消息时,它会响应。但是,一旦收到消息,它就会多次收到消息:

Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'
Executing: 'Functions.ProcessQueueMessage' - Reason: 'New ServiceBus message detected on 'MyQueue'.'

此外,当任务运行时(我看到媒体服务功能的输出),它会从队列中获得另一个“副本”。

最后在任务完成后,它仍然间歇性地处理相同的消息。

【问题讨论】:

队列中指定的 DeliveryCount 和 LockDuration 是多少? 我需要调查一下。 LockDuration 的奇怪之处在于,它似乎几乎立即两次接收到消息,就好像它根本没有锁定它一样。我想知道我设置异步处理程序的方式是否真的有问题? 听起来有点不对劲。如果 LockDuration 未过期,代理将永远不会向一个竞争消费者提供相同的消息。我怀疑正在发生其他事情。你有机会在 GitHub 或 BitBucket 上分享一个复制品吗? 【参考方案1】:

我怀疑发生了以下情况: 最大 DurationLock 可以是 5 分钟。如果消息的处理在 5 分钟内完成,则将消息标记为已完成并从代理中删除。否则,如果处理时间超过 5 分钟(我们失去了对消息的锁定),消息将重新出现,并将再次被消费。您可以通过查看邮件中的DeliveryCount 来验证这一点。

要解决此问题,您可以在消息锁定即将到期之前使用BrokeredMessage.RenewLockAsync() 更新消息锁定。

【讨论】:

我将检查该方法,并将查看交付计数和持续时间锁定。奇怪的是,它并非每次都发生,而且它发生的时间似乎并没有太大的一致性,而且它似乎会立即处理两次相同的消息。但是感谢您指出该功能! @AndrewP 如果您的工作抛出未处理的异常,并且您在服务总线配置中设置了 autocompletemessage true,也可能是自动重试的情况。您可以通过在触发函数中简单地抛出一个 ex 来测试这一点,然后向主题发布一条消息 - 您应该会看到一条消息检测到最大重试次数。 锁不是默认自动更新的吗?所以 5 分钟后,如果处理仍在进行中,锁将自动更新。还是我错过了什么? 不是自动的。如果发生这种情况,您可以通过从不完成处理来锁定您的资源。 C# 客户端有该选项,但即便如此,您也需要指定一个限制。

以上是关于如何防止 Azure webjob 同时多次处理相同的消息的主要内容,如果未能解决你的问题,请参考以下文章

如何防止重复消息在 WebJob 处理时不插入到服务总线队列中?

如何基于 Azure 中的服务总线队列自动缩放 Python webjob?

Azure 函数 - 防止多次调用

连续 WebJob 自动停止

如何在没有“始终开启”的情况下保持 Azure WebJob 运行

如何扩展 Azure Webjobs