如何使用 Java 将错误消息移动到 Azure 死信队列(主题 - 订阅)?
Posted
技术标签:
【中文标题】如何使用 Java 将错误消息移动到 Azure 死信队列(主题 - 订阅)?【英文标题】:How to move error message to Azure dead letter queue(Topics - Subscription) using Java? 【发布时间】:2021-04-11 20:11:38 【问题描述】:我需要将消息从 Azure 主题订阅发送到死信队列,以防在读取和处理来自主题的消息时出现任何错误。所以我尝试测试直接向DLQ推送消息。
我的示例代码会是这样的
static void sendMessage()
// create a Service Bus Sender client for the queue
ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.sender()
.topicName(topicName)
.buildClient();
// send one message to the topic
senderClient.sendMessage(new ServiceBusMessage("Hello, World!"));
static void resceiveAsync()
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subName)
.buildAsyncClient();
// receive() operation continuously fetches messages until the subscription is disposed.
// The stream is infinite, and completes when the subscription or receiver is closed.
Disposable subscription = receiver.receiveMessages().subscribe(message ->
System.out.printf("Id: %s%n", message.getMessageId());
System.out.printf("Contents: %s%n", message.getBody().toString());
, error ->
System.err.println("Error occurred while receiving messages: " + error);
, () ->
System.out.println("Finished receiving messages.");
);
// Continue application processing. When you are finished receiving messages, dispose of the subscription.
subscription.dispose();
// When you are done using the receiver, dispose of it.
receiver.close();
我尝试获取死信队列路径
String dlq = EntityNameHelper.formatDeadLetterPath(topicName);
我得到了死信队列的路径,例如 = "mytopic/$deadletterqueue"
但是在将路径作为主题名称传递时它不起作用。它抛出一个Entity topic not found异常。
谁能给我建议
参考: How to move error message to Azure dead letter queue using Java?
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#moving-messages-to-the-dlq
How to push the failure messages to Azure service bus Dead Letter Queue in Spring Boot Java?
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-topics-subscriptions-legacy#receive-messages-from-a-subscription
【问题讨论】:
【参考方案1】:您可能知道,如果您在处理过程中抛出异常,并且超出了最大传递计数,则消息将自动移至死信队列。如果您想明确地将消息移动到 DLQ,您也可以这样做。一个常见的情况是,如果您知道消息由于其内容而永远不会成功。
您不能直接向 DLQ 发送 新 消息,因为这样您的系统中就会有两条消息。您需要对父实体调用特殊操作。此外,<topic path>/$deadletterqueue
不起作用,因为这将是所有订阅的 DLQ。正确的实体路径是这样构建的:
<queue path>/$deadletterqueue
<topic path>/Subscriptions/<subscription path>/$deadletterqueue
https://github.com/Azure/azure-service-bus/blob/master/samples/Java/azure-servicebus/DeadletterQueue/src/main/java/com/microsoft/azure/servicebus/samples/deadletterqueue/DeadletterQueue.java
此示例代码适用于队列,但您应该能够很容易地将其适应主题:
// register the RegisterMessageHandler callback
receiver.registerMessageHandler(
new IMessageHandler()
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message)
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json"))
// ...
else
return receiver.deadLetterAsync(message.getLockToken());
return receiver.completeAsync(message.getLockToken());
// callback invoked when the message handler has an exception to report
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase)
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
,
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
executorService);
【讨论】:
以上是关于如何使用 Java 将错误消息移动到 Azure 死信队列(主题 - 订阅)?的主要内容,如果未能解决你的问题,请参考以下文章
azure-notificationhubs IOS 错误请求