从死信队列重新提交消息 - Azure 服务总线

Posted

技术标签:

【中文标题】从死信队列重新提交消息 - Azure 服务总线【英文标题】:Resubmitting a message from dead letter queue - Azure Service Bus 【发布时间】:2017-06-07 12:11:27 【问题描述】:

我在 Azure 中创建了一个服务总线队列,它运行良好。如果消息在默认尝试(10 次)内没有被传递,它会正确地将消息移动到死信队列。

现在,我想将此消息从死信队列重新提交回它的起源队列,看看它是否再次起作用。我已经尝试过使用服务总线资源管理器。但它会立即移至死信队列。

是否可以这样做,如果可以,怎么做?

【问题讨论】:

【参考方案1】:

您需要发送具有相同负载的新消息。 ASB 设计不支持消息重新提交。

【讨论】:

【参考方案2】:

当您修复并重新提交死信队列中的消息时,服务总线资源管理器工具始终会创建原始消息的克隆。它不可能有任何不同,因为默认情况下服务总线消息传递不提供任何消息修复和重新提交机制。我建议您调查为什么您的邮件在重新提交时最终会出现在死信队列中以及它的克隆中。希望这会有所帮助!

【讨论】:

原始邮件是死信,因为它尝试传递 10 次,并且由于某种原因当时服务不可用。但现在服务又恢复了。消息的克隆也显示与原始消息相同的错误,并且通过一次尝试,计数已经是 11。所以我假设传递的消息的计数没有在克隆消息中重置【参考方案3】:

听起来这可能与 ASB 的“重复消息检测”功能有关。

当您在 ServiceBus Explorer 中重新提交消息时,它将克隆该消息,因此新消息将与死信队列中的原始消息具有相同的 Id。

如果您在队列/主题上启用了“需要重复检测”并且您尝试在“重复检测历史时间窗口”内重新提交消息,那么该消息将立即再次移动到死信队列。

如果您想使用服务总线资源管理器重新提交死信消息,那么我认为您必须在队列/主题上禁用“需要重复检测”。

【讨论】:

【参考方案4】:

这可能是Peter Berggreen 所指示的“重复消息检测”,或者如果您直接将 BrokeredMessage 从死信队列移动到活动队列,那么 DeliveryCount 仍将处于最大值并且它会返回到死信更有可能信件队列。

将 BrokeredMessage 从死信队列中拉出,使用 GetBody() 获取内容,使用该数据创建新的 BrokeredMessage 并将其发送到队列。您可以安全地执行此操作,方法是使用 peek 从死信队列中获取消息内容,然后在从死信队列中删除消息之前将新消息发送到活动队列。这样,如果由于某种原因无法写入实时队列,您就不会丢失任何关键数据。

使用新的 BrokeredMessage,您应该不会遇到“重复消息检测”问题,并且 DeliveryCount 将重置为零。

【讨论】:

难道你不能通过从死信队列接收消息但在成功写入实时队列后才使用它来实现安全吗?似乎它的实现会更简单一些。 是的,“收不消费”本质上是在偷看消息。【参考方案5】:

尝试删除死信原因

resubmittableMessage.Properties.Remove("DeadLetterReason");
resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

完整代码

using Microsoft.ServiceBus.Messaging;
using System.Transactions;

namespace ResubmitDeadQueue

    class Program
    
        static void Main(string[] args)
        

            var connectionString = "";
            var queueName = "";

            var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName), ReceiveMode.PeekLock);

            BrokeredMessage originalMessage
                ;
            var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
            do
            
                originalMessage = queue.Receive();
                if (originalMessage != null)
                
                    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
                    
                        // Create new message
                        var resubmittableMessage = originalMessage.Clone();

                        // Remove dead letter reason and description
                        resubmittableMessage.Properties.Remove("DeadLetterReason");
                        resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

                        // Resend cloned DLQ message and complete original DLQ message
                        client.Send(resubmittableMessage);
                        originalMessage.Complete();

                        // Complete transaction
                        scope.Complete();
                    
                
             while (originalMessage != null);
        
    

感谢这里的其他一些回复!

【讨论】:

我用主队列中的 0 条消息,DLQ 中的 1 条消息尝试了确切的代码。运行代码后,我在 main 和 DLQ 中都没有看到任何消息。【参考方案6】:

我们有一批大约 6 万条消息,需要从死信队列中重新处理。通过服务总线资源管理器查看和发送回消息需要大约 6 分钟/来自我的机器的每 1k 条消息。我通过将 DLQ 消息的转发规则设置到另一个队列并从那里自动将其转发到原始队列来解决了这个问题。此解决方案需要大约 30 秒才能处理所有 60k 条消息。

【讨论】:

能否分享一些链接,了解如何在 dlq 上设置自动转发以发送到新主题? 重新处理死信消息后不要忘记删除转发。 使用 C# var client = new ServiceBusAdministrationClient(connectionString); var sub = client.GetSubscriptionAsync(topicName, subscriptionName).Result; sub.Value.ForwardDeadLetteredMessagesTo = "myqueue"; sub= client.UpdateSubscriptionAsync(subscription).Result; 使用 Powershell 和 AzServiceBus $sub = Get-AzServiceBusSubscription -ResourceGroupName MyRG -Namespace MyNS -Topic MyT -Subscription MyS $sub.ForwardDeadLetteredMessagesTo = "myqueue" Set-AzServiceBusSubscription -ResourceGroupName MyRG -Namespace MyNS -Topic MyT -InputObject $sub 从一个队列转发到另一个队列的功能在 BASIC 模式下不可用。只是在人们尝试之前注意一下。【参考方案7】:

我们经常需要重新提交消息。 @Baglay-Vyacheslav 的回答很有帮助。我粘贴了一些更新的 C# 代码,这些代码可与最新的 Azure.Messaging.ServiceBus Nuget 包一起使用。

使得在两个队列/主题/订阅者上处理 DLQ 变得更快/更容易。

using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;
using NLog;

namespace ServiceBus.Tools

    class TransferDeadLetterMessages
    
        // https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.ServiceBus_7.2.1/sdk/servicebus/Azure.Messaging.ServiceBus/README.md

        private static Logger logger = LogManager.GetCurrentClassLogger();

        private static ServiceBusClient client;
        private static ServiceBusSender sender;
    
        public static async Task ProcessTopicAsync(string connectionString, string topicName, string subscriberName, int fetchCount = 10)
        
            try
            
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(topicName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(topicName, subscriberName, new ServiceBusReceiverOptions
                
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                );

                await ProcessDeadLetterMessagesAsync($"topic: topicName -> subscriber: subscriberName", fetchCount, sender, dlqReceiver);
            
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                
                    logger.Error(ex, $"Topic:Subscriber 'topicName:subscriberName' not found. Check that the name provided is correct.");
                
                else
                
                    throw;
                
            
            finally
            
                await sender.CloseAsync();
                await client.DisposeAsync();
            
        

        public static async Task ProcessQueueAsync(string connectionString, string queueName, int fetchCount = 10)
                 
            try
            
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(queueName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
                
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                );

                await ProcessDeadLetterMessagesAsync($"queue: queueName", fetchCount, sender, dlqReceiver);
            
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                
                    logger.Error(ex, $"Queue 'queueName' not found. Check that the name provided is correct.");
                
                else
                
                    throw;
                
            
            finally
            
                await sender.CloseAsync();
                await client.DisposeAsync();
            
        

        private static async Task ProcessDeadLetterMessagesAsync(string source, int fetchCount, ServiceBusSender sender, ServiceBusReceiver dlqReceiver)
        
            var wait = new System.TimeSpan(0, 0, 10);

            logger.Info($"fetching messages (wait.TotalSeconds seconds retrieval timeout)");
            logger.Info(source);

            IReadOnlyList<ServiceBusReceivedMessage> dlqMessages = await dlqReceiver.ReceiveMessagesAsync(fetchCount, wait);

            logger.Info($"dl-count: dlqMessages.Count");

            int i = 1;

            foreach (var dlqMessage in dlqMessages)
            
                logger.Info($"start processing message i");
                logger.Info($"dl-message-dead-letter-message-id: dlqMessage.MessageId");
                logger.Info($"dl-message-dead-letter-reason: dlqMessage.DeadLetterReason");
                logger.Info($"dl-message-dead-letter-error-description: dlqMessage.DeadLetterErrorDescription");

                ServiceBusMessage resubmittableMessage = new ServiceBusMessage(dlqMessage);

                await sender.SendMessageAsync(resubmittableMessage);

                await dlqReceiver.CompleteMessageAsync(dlqMessage);

                logger.Info($"finished processing message i");
                logger.Info("--------------------------------------------------------------------------------------");

                i++;
            

            await dlqReceiver.CloseAsync();

            logger.Info($"finished");
        
    

【讨论】:

以上是关于从死信队列重新提交消息 - Azure 服务总线的主要内容,如果未能解决你的问题,请参考以下文章

Azure 服务总线中的死信队列中的消息是不是过期?

将死信队列中的所有消息移回订阅的主队列

Azure SB 队列将消息发送到禁用的死信队列

从逻辑应用程序设置死信原因

如何在服务总线队列触发功能中将服务总线消息移动到死信

从 Azure 函数将消息写入 Azure 服务总线队列