读取 Azure 服务总线队列中的所有活动消息

Posted

技术标签:

【中文标题】读取 Azure 服务总线队列中的所有活动消息【英文标题】:Read all active messages in azure service bus queue 【发布时间】:2021-08-16 02:33:06 【问题描述】:

我是 azure 服务总线的新手,我应该将消息推送到队列,然后有一个单独的计划任务将读取此队列中的所有活动消息并将它们批量导入 sql 我之前尝试过这段代码,当我在发送消息后立即调用它时它正在工作,但现在它在单独的计划任务中不起作用。 任何帮助为什么或我可以使用什么来批量阅读消息或这是不可能的

queueClient = new QueueClient(conn, queuename, ReceiveMode.ReceiveAndDelete);

                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                
                    MaxConcurrentCalls = 1,
                    AutoComplete = false
                ;
                queueClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);


public async Task ReceiveMessagesAsync(Message message, CancellationToken token)
        
            messages.Add(message.Body.ToString());
            Console.WriteLine($"Received message: Encoding.UTF8.GetString(message.Body)");
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);
        

        public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        
            Console.WriteLine(exceptionReceivedEventArgs.Exception);
            return Task.CompletedTask;
        

【问题讨论】:

【参考方案1】:

最好知道您使用什么库来执行此操作,但我建议使用https://www.nuget.org/packages/Azure.Messaging.ServiceBus/ nuget 包,您可以:

class Program

    static async Task Main(string[] args)
    
        string queueName = "myqueue";
        var client = new ServiceBusClient("myconn");
        // create a processor that we can use to process the messages
        var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;
        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
        await processor.StartProcessingAsync();
        // Process messages for 5 minutes 
        await Task.Delay(TimeSpan.FromMinutes(5));
        // stop processing 
        Console.WriteLine("Stopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    
    private static Task ErrorHandler(ProcessErrorEventArgs arg)
    
        // Here you can catch errors;
        return Task.CompletedTask;
    
    static async Task MessageHandler(ProcessMessageEventArgs args)
    
        // Do something with the message .e.g deserialize it and insert to SQL
        try
        
            BinaryData content = args.Message.Body;
            // Here you can use :
            string contentStr = content.ToString(); // This would be your data
        
        catch (Exception e)
        
            // If something goes wrong you should abandon the message
            await args.AbandonMessageAsync(args.Message);
        
        await args.CompleteMessageAsync(args.Message);
    

这将处理所有消息,直到时间结束。这对于计划任务很有用。如果您想在处理完特定数量的消息后停止,可以执行异步循环并检查是否已达到该数量。

【讨论】:

但是如果我需要原始函数中的contentStr 怎么办?在这种情况下,它是 Main 但如果这是一个 HTTP 端点并且我需要将消息正文返回给调用者怎么办? 通常不建议将服务总线与 HTTP 端点混合使用,它们的工作方式非常不同。服务总线是异步的,你不应该像在 HTTP 上那样等待响应。如果您需要在 HTTP 客户端(例如浏览器)上接收服务总线消息,我建议您查看 SignalR 之类的东西,在这种情况下,您将收到消息并通过集线器通知客户端。看看docs.particular.net/samples/near-realtime-clients。

以上是关于读取 Azure 服务总线队列中的所有活动消息的主要内容,如果未能解决你的问题,请参考以下文章

Azure 服务总线中的死信队列中的消息是不是过期?

Azure 服务总线向队列中添加消息的速度过快

如何基于 Azure 中的服务总线队列自动缩放 Python webjob?

无法从 Azure 服务总线中的并发会话按顺序接收消息

C# 消息队列-Microsoft Azure service bus 服务总线

逻辑应用中的 Azure 服务总线队列错误