带有事务的长期运行作业 Azure 服务总线的模式

Posted

技术标签:

【中文标题】带有事务的长期运行作业 Azure 服务总线的模式【英文标题】:Pattern for long running jobs Azure Service Bus with transactions 【发布时间】:2015-11-03 01:00:36 【问题描述】:

我想将事务与 Azure 服务总线一起使用,但我有一些消息是网络/API 绑定的,我根本无法可靠地进一步重构它们以使它们在 5 分钟内完成 - 允许的最大 PeekLock 持续时间。

我找不到任何允许我扩展锁的 API,所以也许还有另一种模式。

一种可能的解决方案:

1) 使用现有的实现来接收消息。如果从需要长时间运行事务的主题/队列中获取 - 使用新的 ScheduledEnqueueTimeUtc 更新消息并发送回服务总线。

myMessage.ScheduledEnqueueTimeUtc = TimeSpan.FromMinutes(actualLockDuration);
serviceBusClient.PublishMessage(topic, myMessage);

2) 通过 MessageId 获取特定消息并将该新消息标记为完成。

if (oldMessage.LockedUntilUtc > DateTime.UtcNow) 
  var message = FetchMessage(oldMessage.MessageId);
  message.Complete();
 else 
  oldMessage.Complete();


再想一想,在寻找通过 messageId 获取消息的 API 之后 - 我没有看到。如果我通过序列 ID 获取,那么我需要一种在步骤 1 之后获取序列 ID 的方法 - 然后我需要重新考虑一些内部系统(大型消息处理、消息记录和关联等)

【问题讨论】:

"序列号的范围是队列或主题。该值仅适用于从服务总线接收到的消息。"我将无法使用 Receive(sequenceNumber) API,因为如果我重新发布要稍后获取的消息,我将不知道该号码。 msdn.microsoft.com/en-us/library/… 【参考方案1】:

我不知道我是怎么错过的。 BrokeredMessage.RenewLock

我围绕它写了一个小异步包装器来更新直到最长持续时间。

public static async Task<ProcessMessageReturn> RenewLockAfter(this Task<ProcessMessageReturn> processTask, BrokeredMessage message, int maxDuration)

    var ss = new SemaphoreSlim(2);
    var startTime = DateTime.UtcNow;
    var trackedTasks = new List<Task> processTask;
    var timeoutCancellationTokenSource = new CancellationTokenSource();

    while (true)
    
        ss.Wait(timeoutCancellationTokenSource.Token);

        if (startTime.AddMinutes(maxDuration) < DateTime.UtcNow)
        
            var task = Task.Run(async () =>
            
                await Task.Delay(TimeSpan.FromTicks(message.LockedUntilUtc.Ticks - DateTime.UtcNow.AddSeconds(30).Ticks), timeoutCancellationTokenSource.Token);
                await message.RenewLockAsync();
                ss.Release();
            , timeoutCancellationTokenSource.Token);
            trackedTasks.Add(task);
        


        var completedTask = await Task.WhenAny(trackedTasks);
        if (completedTask != processTask) continue;

        timeoutCancellationTokenSource.Cancel();
        return processTask.Result;
    


【讨论】:

我用SemaphoreSlim(1) 只做主任务和一个renewLock 任务。在if (startTime ... 中,我不得不将&lt; 更改为&gt;。这是一个真正优雅的解决方案。

以上是关于带有事务的长期运行作业 Azure 服务总线的模式的主要内容,如果未能解决你的问题,请参考以下文章

具有事务范围的 Azure 服务总线

发件人停止运行时 Azure 服务总线队列 ScheduledEnqueueTimeUtc 的行为

Azure 服务总线:使用函数、服务结构和 Web 作业? [关闭]

Azure 服务总线 - 发布到队列和事务范围内的主题

来自 azure template.json 中流分析作业模板的服务总线的 sharedAccessPolicyKey

Azure 服务总线队列性能