无法从 Azure 服务总线中的并发会话按顺序接收消息

Posted

技术标签:

【中文标题】无法从 Azure 服务总线中的并发会话按顺序接收消息【英文标题】:Not able to receive messages in sequential order from concurrent session in Azure Service Bus 【发布时间】:2021-09-22 11:53:48 【问题描述】:

我是 Azure 服务总线的新手,并试图了解启用会话的队列是如何工作的。

我有 3 个控制台应用程序:1 个消息发送器应用程序,用于将消息发送到队列和 2 个消息接收器应用程序。 Azure 服务总线队列已启用会话,我正在尝试实现并发会话。

每个运行消息发送者控制台应用程序的用户都会向队列发送一条消息,并且该消息必须被两个接收者控制台应用程序接收。 并且接收方应用程序必须按顺序接收/处理每条消息,首先是 Receiver1,然后是 Receiver2。为此,我标记了每条消息 在发送时使用接收方应用程序名称,然后在接收方代码中检查接收方应用程序要接收的标签名称。

以下是我用于发送和接收消息的代码。

消息发送者代码:

public static int Main(string[] args)

    MainAsync().GetAwaiter().GetResult();


public async Task MainAsync()

    await SendSessionMessagesAsync();


public async Task SendSessionMessagesAsync()

    // Below userList coming from db
    foreach (var user in userList)
    
        List<ReceiverApplicationName> receiverApplicationNameList = new List<ReceiverApplicationName>();
        var receiverApplicationNames = //getting the ReceiverApplicationName list from appsettings.json, [ "Receiver1", "Receiver2" ]
        foreach (var item in receiverApplicationNames)
        
            ReceiverApplicationName receiverApplicationName = new ReceiverApplicationName();
            receiverApplicationName.UserId = user.Id;
            receiverApplicationName.ApplicationName = item;
            receiverApplicationNameList.Add(receiverApplicationName);
        
        foreach (var queueMessageItem in receiverApplicationNameList)
        
            var messageBody = JsonConvert.SerializeObject(queueMessageItem);
            var message = new Message(Encoding.UTF8.GetBytes(messageBody));
            message.SessionId = "Session " + queueMessageItem.UserId;
            Console.WriteLine($"Sending message for UserId: queueMessageItem.UserId, SessionId: message.SessionId, Message: messageBody");
            await queueClient.SendAsync(message); // 2 messages(Receiver1 and Receiver2) for each user
        
    

消息接收方代码(Receiver1):

public static int Main(string[] args)

    MainAsync().GetAwaiter().GetResult();


public async Task MainAsync()

    RegisterOnSessionHandlerAndReceiveSessionMessages();


public void RegisterOnSessionHandlerAndReceiveSessionMessages()

    var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
    
        MaxConcurrentSessions = 2,
        MessageWaitTimeout = TimeSpan.FromSeconds(5),
        AutoComplete = false,
    ;
    queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);


public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)

    _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
    return Task.CompletedTask;


public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)

    var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
    if (result.ReceiverApplicationName == "Receiver1")
    
        Console.WriteLine($"Received message for UserId: result.UserId, Session: session.SessionId, Message: SequenceNumber: message.SystemProperties.SequenceNumber, Body:Encoding.UTF8.GetString(message.Body), at: DateTime.Now");
        await session.CompleteAsync(message.SystemProperties.LockToken);
    

消息接收方代码(Receiver2):

public static int Main(string[] args)

    MainAsync().GetAwaiter().GetResult();


public async Task MainAsync()

    RegisterOnSessionHandlerAndReceiveSessionMessages();


public void RegisterOnSessionHandlerAndReceiveSessionMessages()

    var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler)
    
        MaxConcurrentSessions = 2,
        MessageWaitTimeout = TimeSpan.FromSeconds(5),
        AutoComplete = false,
    ;
    queueClient.RegisterSessionHandler(ProcessSessionMessagesAsync, sessionHandlerOptions);


public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)

    _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
    return Task.CompletedTask;


public async Task ProcessSessionMessagesAsync(IMessageSession session, Message message, CancellationToken token)

    var result = JsonConvert.DeserializeObject<ReceiverApplicationName>(Encoding.UTF8.GetString(message.Body));
    if (result.ReceiverApplicationName == "Receiver2")
    
        Console.WriteLine($"Received message for UserId: result.UserId, Session: session.SessionId, Message: SequenceNumber: message.SystemProperties.SequenceNumber, Body:Encoding.UTF8.GetString(message.Body), at: DateTime.Now");
        await session.CompleteAsync(message.SystemProperties.LockToken);
    

结果: 我无法在会话 ID 中按顺序接收消息。 Receiver1 和 Receiver2 收到消息的时间戳不按顺序排列。例如,在 UserId = 2 时,Receiver1 应该在 Receiver2 应用程序接收到消息之前接收到消息。在会话中,应按添加顺序接收消息,否则我不正确。

消息发送方输出: MessageSender

接收器1输出: Receiver1

接收器2输出: Receiver2

【问题讨论】:

由于多个接收器同时运行,您不能期望消息以任何特定顺序到达。如果您的设计依赖于按顺序处理的消息,您应该重新审视您的设计。 【参考方案1】:

来自给定会话的消息只能由单个消费者处理。您无法扩展单个会话的处理。文档清楚地说明了这一点:

当客户端接受并持有会话时,客户端会在队列或订阅中对所有具有该会话的会话 ID 的消息持有排他锁。它还将对所有具有稍后到达的会话 ID 的消息进行排他锁。

详情请参阅服务总线documentation。

【讨论】:

以上是关于无法从 Azure 服务总线中的并发会话按顺序接收消息的主要内容,如果未能解决你的问题,请参考以下文章

Azure 服务总线主题订阅者接收订单

从服务总线队列接收消息时出现问题

函数应用无法从 Azure 服务总线获取消息

失败的消息不会从 Azure 服务总线触发函数移动到 DLQ

用于会话的 Azure WebJob ServiceBusTrigger

ASP.NET Core Web 应用程序作为 Azure 服务总线使用者/接收器