Azure 服务总线可以在重试消息之前延迟吗?

Posted

技术标签:

【中文标题】Azure 服务总线可以在重试消息之前延迟吗?【英文标题】:Can the Azure Service Bus be delayed before retrying a message? 【发布时间】:2014-02-27 11:48:10 【问题描述】:

Azure 服务总线支持内置的重试机制,该机制使放弃的消息立即可见,以便再次尝试读取。我正在尝试使用这种机制来处理一些暂时性错误,但消息在被放弃后立即可用。

我想做的是让消息在被放弃后的一段时间内不可见,最好是基于指数递增的策略。

我尝试在放弃消息时设置ScheduledEnqueueTimeUtc属性,但似乎没有效果:

var messagingFactory = MessagingFactory.CreateFromConnectionString(...);

var receiver = messagingFactory.CreateMessageReceiver("test-queue");

receiver.OnMessageAsync(async brokeredMessage =>

    await brokeredMessage.AbandonAsync(
        new Dictionary<string, object>
        
             "ScheduledEnqueueTimeUtc", DateTime.UtcNow.AddSeconds(30) 
        );
    
);

我考虑过根本不放弃消息而只是让锁过期,但这需要有一些方法来影响MessageReceiver 如何指定消息的锁定持续时间,而我在其中找不到任何内容让我更改此值的 API。此外,在已经需要锁定之前,无法读取消息的传递计数(并因此决定等待下一次重试的时间)。

消息总线中的重试策略是否会受到某种方式的影响,或者是否可以通过其他方式人为地引入延迟?

【问题讨论】:

由于没有任何立即好的答案,我已经在服务总线用户语音上打开了一个请求:feedback.windowsazure.com/forums/216926-service-bus/suggestions/…。随意支持或忽略。 更新历史参考链接:feedback.azure.com/forums/216926-service-bus/suggestions/… 【参考方案1】:

This question 询问如何在 Azure Functions 中实现指数退避。如果您不想使用内置的 RetryPolicy(仅在 autoComplete = false 时可用),这是我一直在使用的解决方案:

        public static async Task ExceptionHandler(IMessageSession MessageSession, string LockToken, int DeliveryCount)
        
            if (DeliveryCount < Globals.MaxDeliveryCount)
            
                var DelaySeconds = Math.Pow(Globals.ExponentialBackoff, DeliveryCount);
                await Task.Delay(TimeSpan.FromSeconds(DelaySeconds));
                await MessageSession.AbandonAsync(LockToken);
            
            else
            
                await MessageSession.DeadLetterAsync(LockToken);
            
        

【讨论】:

【参考方案2】:

我遇到了类似的问题,我们的订单拣选系统是遗留系统,每晚都会进入维护模式。

使用本文中的想法(https://markheath.net/post/defer-processing-azure-service-bus-message),我创建了一个自定义属性来跟踪一条消息被重新提交的次数,并在 10 次尝试后手动对消息进行死信。如果消息重试次数低于 10 次,它会克隆消息,增加自定义属性并设置新消息的队列。

using Microsoft.Azure.ServiceBus;
public PickQueue()

    queueClient = new QueueClient(QUEUE_CONN_STRING, QUEUE_NAME); 

public async Task QueueMessageAsync(int OrderId)

    string body = JsonConvert.SerializeObject(OrderId);
    var message = new Message(Encoding.UTF8.GetBytes(body));
    await queueClient.SendAsync(message);


public async Task ReQueueMessageAsync(Message message, DateTime utcEnqueueTime)

    int resubmitCount = (int)(message.UserProperties["ResubmitCount"] ?? 0) + 1;
    if (resubmitCount > 10)
    
        await queueClient.DeadLetterAsync(message.SystemProperties.LockToken);
    
    else
    
        Message clone = message.Clone();
        clone.UserProperties["ResubmitCount"] = ++resubmitCount;
        await queueClient.ScheduleMessageAsync(message, utcEnqueueTime);
    

【讨论】:

【参考方案3】:

我实际上是asked this same question last year(抛开实现)使用了我能想到的三种查看 API 的方法。在 SB 团队工作的@ClemensVasters 回应说,使用 Defer 和某种重新接收确实是精确控制这一点的唯一方法。

您可以阅读我对他的回答的评论以了解具体方法,我建议使用辅助队列来存储指示哪些主要消息已被延迟并且需要从主队列重新接收的消息。然后,您可以通过在这些辅助消息上设置ScheduledEnqueueTimeUtc 来控制等待的时间,以准确地控制在重试之前等待的时间。

【讨论】:

二级队列引入了一些原子性和“恰好一次”处理的问题,以及队列的加倍。也许从死信队列中读取并在那里实现延迟会更好? 我认为 Drew 的方法会很有效。原始消息永远不会从主队列中删除。它被标记为延迟,这意味着除非使用序列号来检索它,否则它不会被取回。辅助队列消息仅包含序列号。请记住,带有窥视锁的代理消息传递为您提供“至少一次”处理,而不是“恰好一次”处理。如果您只需要一次,您必须自己处理。 此外,使用 Drew 建议的方法允许您使用 ScheduledEnqueueTimeUtc 控制辅助队列消息“弹出”的时间。如果您只使用死信队列,那么除非您控制从死信中提取的速度,否则您的情况并没有好转。 同意,真正的重复更难处理。关于延迟队列,您将通过提取序列号然后处理来自主节点的消息来处理该队列;但是,您是正确的,这可能会导致原子性问题,因为事务支持非常有限。有关交易的更多信息,请参阅geekswithblogs.net/asmith/archive/2012/04/02/149176.aspx。 如果要引入辅助队列,延迟消息有什么意义?只需使用 ScheduledEnqueueTimeUtc 将相同的消息重新排队到同一个队列中【参考方案4】:

这里要小心,因为我认为您将重试功能与用于OnMessage 事件驱动消息处理的自动Complete/Abandon 机制混淆了。当对服务总线的调用失败时,内置的重试机制就会发挥作用。例如,如果您调用将消息设置为完成但失败,则重试机制将启动。如果您正在处理消息,则会在您自己的代码中发生异常,该异常不会通过重试功能触发重试。您的问题没有明确说明错误是来自您的代码还是在尝试联系服务总线时。

如果您确实在修改了尝试与服务总线通信时发生错误时发生的重试策略,您可以修改在MessageReciver 本身上设置的RetryPolicy。有一个默认使用的RetryExponitial,还有一个抽象的RetryPolicy,您可以从中创建自己的。

我认为您追求的是更好地控制当您在处理过程中遇到异常时会发生什么,并且您希望推迟处理该消息。有几个选项:

创建消息处理程序时,您可以设置OnMessageOptions。属性之一是“自动完成”。默认情况下,它设置为 true,这意味着一旦消息处理完成,Complete 方法就会自动调用。如果发生异常,则会自动调用放弃,这就是您所看到的。通过将 AutoComplete 设置为 false,您需要从消息处理程序中自行调用 Complete。否则将导致消息锁最终用完,这是您正在寻找的行为之一。

因此,您可以编写处理程序,以便在处理过程中发生异常时您根本不调用Complete。然后,该消息将保留在队列中,直到它的锁用完,然后再次可用。应用标准死信机制,在尝试 x 次后,它将自动放入死信队列。

以这种方式处理的一个注意事项是,任何类型的异常都会以这种方式处理。您确实需要考虑执行此操作的异常类型以及您是否真的要推迟处理。例如,如果您在处理过程中调用第三方系统并且它给您一个您知道是暂时的异常,很好。但是,如果它给您一个错误,您知道这将是一个大问题,那么您可能会决定在系统中执行其他操作,而不仅仅是放弃该消息。

您还可以查看“Defer”方法。然后,此方法实际上将不允许该消息从队列中处理,除非它是通过其序列号专门拉出的。您的代码必须记住序列号值并拉取它。不过这和你描述的不太一样。

另一个选项是您可以摆脱 OnMessage、事件驱动的消息处理方式。虽然这非常有帮助,但您无法控制很多事情。而是连接您自己的处理循环并自行处理放弃/完成。您还需要处理 OnMessage 模式为您提供的一些线程/并发调用管理。这可能需要更多的工作,但您拥有最大的灵活性。

最后,我相信您对 AbandonAsync 的调用传递您要修改的属性不起作用的原因是这些属性是指方法上的 Metadata properties,而不是 BrokeredMessage 上的标准属性。

【讨论】:

迈克,对不起,我还不够清楚。这与自动完成/放弃机制无关,更多的是与限制重试先前放弃的消息的速度有关。问题在于传递尝试之间没有延迟,因此无法处理消息处理中的长期(分钟,而不是毫秒)瞬态错误。 很公平。根据您的陈述“我考虑过根本不放弃消息而只是让锁过期”,上述不调用完成的机制将起作用。尽管我认为 Drew 在下面的回答更好,但对于您更大的要求。

以上是关于Azure 服务总线可以在重试消息之前延迟吗?的主要内容,如果未能解决你的问题,请参考以下文章

Azure 服务总线 http 与 websocket

在重试同一条消息之前,GCM CCS 等待 XMPP 服务器的确认多长时间?

读取 Azure 服务总线队列中的所有活动消息

Azure 服务总线 - 取消或重新安排没有序列号的预定消息

消息正文上的 Azure 服务总线订阅筛选器

在 web api 中接收服务总线消息队列/主题