Azure:如何将消息从毒队列移回主队列?

Posted

技术标签:

【中文标题】Azure:如何将消息从毒队列移回主队列?【英文标题】:Azure: How to move messages from poison queue to back to main queue? 【发布时间】:2016-01-20 00:43:27 【问题描述】:

我想知道是否有可以在队列之间移动消息的工具或库? 目前,我正在做类似下面的事情

public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)

    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("myqueue");
    queue.CreateIfNotExists();

    var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings  ContractResolver = new CamelCasePropertyNamesContractResolver() );
    queue.AddMessage(new CloudQueueMessage(messageData));

【问题讨论】:

【参考方案1】:

基本上 Azure 存储不支持将消息从一个队列移动到另一个队列。您需要自己执行此操作。

实现将消息从一个队列移动到另一个队列的一种方法是将消息从源队列中取出(通过调用GetMessages),读取消息的内容,然后在目标队列中创建一条新消息。这可以通过使用存储客户端库来完成。

我想到的一个用于移动消息的工具是Cerebrata Azure Management Studio(付费产品,可免费试用 15 天)。它有这个功能。

截至 (2018-09-11) 版本 1.4.1 的 Microsoft Azure Storage Explorer 不支持移动队列消息。

【讨论】:

Azure 存储资源管理器在撰写本文时似乎不支持这一点。【参考方案2】:

截至 (2018-09-11) 版本 1.4.1 的 Microsoft Azure Storage Explorer 无法将消息从一个 Azure 队列移动到另一个。

我blogged 一个简单的解决方案将有害消息传输回原始队列,并认为它可能会为某人节省几分钟。显然,您需要修复导致消息进入有害消息队列的错误!

您需要将 NuGet 包引用添加到 Microsoft.NET.Sdk.Functions:

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;

void Main()

    const string queuename = "MyQueueName";

    string storageAccountString = "xxxxxx";

    RetryPoisonMesssages(storageAccountString, queuename);


private static int RetryPoisonMesssages(string storageAccountString, string queuename)

    CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

    int count = 0;
    while (true)
    
        var msg = poisonqueue.GetMessage();
        if (msg == null)
            break;

        poisonqueue.DeleteMessage(msg);
        targetqueue.AddMessage(msg);
        count++;
    

    return count;


private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)

    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference(queuename);

    return queue;

【讨论】:

太好了,谢谢。如果您从控制台运行它,添加WindowsAzure.Storage 就足够了。不过,您需要调用 GetMessageDeleteMessageAddMessage 异步,因为这些非异步方法已从库中删除。【参考方案3】:

这是一个您可能会发现有用的 python 脚本。你需要安装azure-storage-queue

queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    while queueService.peek_messages(queue.name):
      for message in queueService.get_messages(queue.name, 32):
        print(".", end="", flush=True)
        queueService.put_message(targetQueueName, message.content)
        queueService.delete_message(queue.name, message.id, message.pop_receipt)

【讨论】:

我收到此错误:异常:[AttributeError] ("'QueueServiceClient' object has no attribute 'peek_messages'",)【参考方案4】:

致任何来这里寻找与使用 Azure 函数的 @MitchWheats 答案等效的 Node 的人。

import AzureStorage from 'azure-storage'
import  Context, HttpRequest  from '@azure/functions'
import util from 'util'

const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()

const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)

export async function run (context: Context, req: HttpRequest): Promise<void> 
  try 
    const poisonQueue = (req.query.queue || (req.body && req.body.queue));
    const targetQueue = poisonQueue.split('-')[0]

    let count = 0

    while (true) 
      const message = await getMessage(poisonQueue)
      if (!message)  break; 
      if (message.messageText && message.messageId && message.popReceipt) 
        await createMessage(targetQueue, message.messageText)
        await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
      
      count++
    

    context.res = 
      body: `Replayed $count messages from $poisonQueue on $targetQueue`
    ;
   catch (e) 
    context.res =  status: 500 
  

要使用该功能,您需要提供用于存储队列的存储帐户的连接信息。这是作为环境变量提供的。您要么提供AZURE_STORAGE_ACCOUNTAZURE_STORAGE_ACCESS_KEY,要么提供AZURE_STORAGE_CONNECTION_STRING。更多信息请访问Azure Storage SDK docs。

在Medium article也写了几行

【讨论】:

【参考方案5】:

这是 Mitch 答案的更新版本,使用最新的 Microsoft.Azure.Storage.Queue 包。只需创建一个新的 .NET Console 应用程序,将上述包添加到其中,并将 Program.cs 的内容替换为以下内容:

using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;

namespace PoisonMessageDequeuer

    class Program
    
        static async Task Main(string[] args)
        
            const string queuename = "MyQueueName";

            string storageAccountString = "xxx";

            await RetryPoisonMesssages(storageAccountString, queuename);
        

        private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
        
            var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
            var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

            var count = 0;
            while (true)
            
                var msg = await poisonqueue.GetMessageAsync();
                if (msg == null)
                    break;

                await poisonqueue.DeleteMessageAsync(msg);
                await targetqueue.AddMessageAsync(msg);
                
                count++;
            

            return count;
        

        private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
        
            var storageAccount = CloudStorageAccount.Parse(storageAccountString);
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference(queuename);

            return queue;
        
    

如果您处理超过 1000 条消息,它仍然很慢,所以我建议您研究批量 API 以获得更多数量。

【讨论】:

我建议颠倒 DeleteMessageAsync 和 AddMessageAsync 的顺序,这样如果出现问题,您会收到重复的消息而不是丢失的消息? 好建议!在我的情况下,队列消息会触发非关键电子邮件,所以我宁愿删除消息而不是重复消息 是的,您的用例可能会有所不同,但我认为安全的默认设置是获取副本,因为在云中您几乎必须使所有处理都是幂等的,因为您可以提供最好的保证get 是“至少一次”。也许代码中的注释至少能让人们意识到这个决定? 我希望人们不要盲目地复制和运行我的代码,这只是一个如何使用最新 API 实现这一目标的示例,但我认为这样做可能仍然是明智的。完成! 请注意,此代码在最新的存储帐户版本中已损坏。原因是 AddMessageAsync 将覆盖消息上的一些信息,然后 DeleteMessagAsync 将给出 404。更好的解决方案是将值复制到 AddMessageAsync 的新消息中【参考方案6】:

正如Mikael Eliasson 所说,code in IGx89 answer 已损坏,因为

AddMessageAsync 将覆盖消息上的一些信息,然后 DeleteMessagAsync 将给出 404。更好的解决方案是复制 值到 AddMessageAsync 的新消息中

请查看 RetryPoisonMesssages 的增强版本,它能够仅指定消息列表(而不是队列中的所有消息)并允许复制消息而不是移动它们。 它还记录每条消息的成功/失败。

/// <param name="storageAccountString"></param>
/// <param name="queuename"></param>
/// <param name="idsToMove">If not null, only messages with listed IDs will be moved/copied</param>
/// <param name="deleteFromPoisonQueue">if false,  messages will be copied; if true, they will be moved
///Warning: if queue is big, keeping deleteFromPoisonQueue=false can cause the same row 
///from poisonqueue to be copied more than once(the reason is not found yet)</param>
/// <returns></returns>
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename, string[] idsToMove=null, bool deleteFromPoisonQueue=false)

    var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    var poisonQueueName = queuename + "-poison";
    var poisonqueue = GetCloudQueueRef(storageAccountString, poisonQueueName);

    var count = 0;
    while (true)
    
        var msg = await poisonqueue.GetMessageAsync();
        if (msg == null)
        
            Console.WriteLine("No more messages in a queue " + poisonQueueName);
            break;
        

        string action = "";
        try
        
            if (idsToMove == null || idsToMove.Contains(msg.Id))
            
                var msgToAdd = msg;
                if (deleteFromPoisonQueue)
                
                    //The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404.
                    //The better solution is to copy the values into a new message for AddMessageAsync 
                     msgToAdd = new CloudQueueMessage(msg.AsBytes);
                

                action = "adding";
                await targetqueue.AddMessageAsync(msgToAdd);
                Console.WriteLine(action + " message ID " + msg.Id);
                if (deleteFromPoisonQueue)
                
                    action = "deleting";
                    await poisonqueue.DeleteMessageAsync(msg);
                
                Console.WriteLine(action + " message ID " + msg.Id);
            
        
        catch (Exception ex)
        
            Console.WriteLine("Error encountered when "+ action + " " + ex.Message + " at message ID " + msg.Id);
        

        count++;
    

    return count;

【讨论】:

【参考方案7】:

根据 Jon Canning 的回答更新了 python:

from azure.storage.queue import QueueServiceClient


queueService = QueueServiceClient.from_connection_string(conn_str="DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<key>;EndpointSuffix=core.windows.net")

for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    queue = queueService.get_queue_client(queue=queue.name)
    targetQueue = queueService.get_queue_client(queue=targetQueueName)
    while queue.peek_messages() :
        messages = queue.receive_messages()
        for msg in messages:
            targetQueue.send_message(msg.content)
            queue.delete_message(msg)            

【讨论】:

【参考方案8】:

我只需要再次执行此操作,并花时间将我的片段更新到新的存储 SDK。有关更多信息,请参阅https://www.bokio.se/engineering-blog/how-to-re-run-the-poison-queue-in-azure-webjobs/ 的帖子。

这是我使用的代码

using Azure.Storage.Queues;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AzureQueueTransfer

    internal class Program
    
        // Need Read, Update & Process (full url, can create in storage explorer)
        private const string sourceQueueSAS = ""; 

        // Need Add (full url, can create in storage explorer)
        private const string targetQueueSAS = "";
        private static async Task Main(string[] args)
        
            var sourceQueue = new QueueClient(new Uri(sourceQueueSAS));
            var targetQueue = new QueueClient(new Uri(targetQueueSAS));

            var queuedAny = true;
            while (queuedAny)
            
                Thread.Sleep(30000); // Sleep to make sure we dont build too much backlog so we can process new messages on higher prio than old ones
                queuedAny = false;
                foreach (var message in sourceQueue.ReceiveMessages(maxMessages: 32).Value)
                
                    queuedAny = true;
                    var res = await targetQueue.SendMessageAsync(message.Body);

                    Console.WriteLine($"Transfered: message.MessageId");
                    await sourceQueue.DeleteMessageAsync(message.MessageId, message.PopReceipt);
                

                Console.WriteLine($"Finished batch");
             
        
    


【讨论】:

【参考方案9】:

Azure 存储资源管理器 1.15.0 版现在可以在 2020 年执行此操作。https://github.com/microsoft/AzureStorageExplorer/issues/1064

【讨论】:

但它的速度非常慢。每秒 2 条消息

以上是关于Azure:如何将消息从毒队列移回主队列?的主要内容,如果未能解决你的问题,请参考以下文章

Azure 函数队列触发器:如何设置出列消息的时间延迟

使用azure服务总线,如何将单个消息发布到多个队列?

如何使用 Java 将错误消息移动到 Azure 死信队列(主题 - 订阅)?

如何使用 python 快速将消息发送到 Azure 队列存储?

如何将消息添加到 azure webjobs 队列

如何配置 Azure 服务总线队列以将消息推送到客户端而不进行轮询?