如何每隔 X 秒批量处理来自服务总线队列的消息

Posted

技术标签:

【中文标题】如何每隔 X 秒批量处理来自服务总线队列的消息【英文标题】:How to Bulk Process Messages from Service Bus Queue Every X Seconds 【发布时间】:2015-08-05 14:47:44 【问题描述】:

我的服务总线队列之前一次处理一条消息。 我想更改它以允许消息排队,这样我就可以每 X 秒批量处理它们。我最初是这样做的:

var messagingFactorySettings = new MessagingFactorySettings

    NetMessagingTransportSettings =  
        BatchFlushInterval = TimeSpan.FromMilliseconds(10000) ,
        TokenProvider = credentials
;

现在我想接收消息,但似乎我必须执行这个无限循环来获取它们:

while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)

    foreach (var message in messages)
     ...

据我了解,BatchFlushInterval 将允许请求累积 X 时间,因此我不会一一接收消息,而是批量接收消息。因此,我不确定为什么我不能像以前那样做:

myQueueClient.OnMessage((m) =>

但是批量版本:

myQueueClient.OnBulkMessage((listOfMessages) =>

我是否遗漏了什么,或者是不断轮询完成此任务的唯一方法?我的 BatchFlushInterval 似乎也被忽略了。我预计它只会每 10 秒检查一次新消息,但它会立即接收第一批消息,并且会立即处理任何进来的新消息。

假设我想每隔 X(即:1)秒从队列中提取 Y 条消息(即:1000)并立即处理它们,我该怎么做? 为什么BatchFlushInterval 没有任何影响吗?

【问题讨论】:

【参考方案1】:

看来,一个简单的Thread.Sleep(x) 就可以了。

我发现暂停直到总时间过去的问题很有趣,所以我开始实施一个秒表子类,使这个问题可以以更清晰、更易读的方式解决。

while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)

    var sw = WaitableStopwatch.StartNew();

    // ReceiveBatch() return IEnumerable<>. No need for .ToList().
    foreach (var message in messages)
    
        ...
    

    // If processing took less than 10 seconds, sleep
    // for the remainder of that time span before getting
    // the next batch.
    sw.Wait(Timespan.FromSeconds(10));




/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch

    /// <summary>
    /// Initializes a new WaitableStopwatch instance, sets the elapsed
    /// time property to zero, and starts measuring elapsed time.
    /// </summary>
    /// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
    public static new WaitableStopwatch StartNew()
    
        WaitableStopwatch sw = new WaitableStopwatch();

        sw.Start();

        return sw;
    

    /// <summary>
    /// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public void Wait(int elapsedMilliseconds)
    
        Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public void Wait(TimeSpan elapsed)
    
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        
            Thread.Sleep(diff);
        
    

    /// <summary>
    /// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public Task WaitAsync(int elapsedMilliseconds)
    
        return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public async Task WaitAsync(TimeSpan elapsed)
    
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        
            await Task.Delay(diff);
        
    

【讨论】:

循环仅每 30 秒进入一次,它会自动处理抓取新消息,但它并没有按照我设置为 BatchFlushInterval 的速率发生。即使我要摆脱 while 循环并编写自己的计时器来调用 ReceiveBatch,我认为这应该是不必要的。使用此代码,计时器将在 30 秒延迟之上。 这个问题并不完全清楚。您是说 ReceiveBatch() 需要 30 秒才能返回,但只返回一条消息,即使消息很多? 我更新了我的问题。实际上,如果那里有新数据,它会自动返回。我设置了一个计时器来继续添加记录,并且它们会立即得到处理,而不是遵守 BatchFlushInterval 中的设置。 在这种情况下,我的回答似乎应该可以解决问题,显然是在更改为十秒之后。我不知道 BatchFlushInterval 设置。 您必须使用计时器可能是对的。我在网上找到了一个链接,他们在其中做类似的事情。我认为 BatchFlushInterval 仅与 send 和 complete 方法有关,但我不确定如何。如果我找不到内置解决方案,我会标记为答案。

以上是关于如何每隔 X 秒批量处理来自服务总线队列的消息的主要内容,如果未能解决你的问题,请参考以下文章

如何查看 Azure 服务总线队列中的所有消息?

如何防止重复消息在 WebJob 处理时不插入到服务总线队列中?

Azure 服务总线队列消息处理

读取 Azure 服务总线队列中的所有活动消息

使用azure服务总线,如何将单个消息发布到多个队列?

如何基于 Azure 中的服务总线队列自动缩放 Python webjob?