Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux

Posted

技术标签:

【中文标题】Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux【英文标题】:Azure ServiceBusSessionReceiverAsyncClient - Mono instead of Flux 【发布时间】:2022-01-05 09:46:05 【问题描述】:

我有一个 Spring Boot 应用,我从 Azure 服务总线队列会话中收到一条消息。

代码是:

@Autowired
ServiceBusSessionReceiverAsyncClient apiMessageQueueIntegrator;
.
.
.
Mono<ServiceBusReceiverAsyncClient> receiverMono = apiMessageQueueIntegrator.acceptSession(sessionid);
        Disposable subscription = Flux.usingWhen(receiverMono,
                receiver -> receiver.receiveMessages(),
                receiver -> Mono.fromRunnable(() -> receiver.close()))
                .subscribe(message -> 
                    // Process message.
                    logger.info(String.format("Message received from quque. Session id: %s. Contents: %s%n", message.getSessionId(),
                        message.getBody()));
                    receivedMessage.setReceivedMessage(message);
                    timeoutCheck.countDown();

                , error -> 
                    logger.info("Queue error occurred: " + error);
                );

由于我只收到来自会话的一条消息,因此我在收到消息后使用 CountDownLatch(1) 处理订阅。

库的文档说,如果我只期望一条消息,则可以使用 Mono.usingWhen 而不是 Flux.usingWhen,但我无法在任何地方找到任何示例,而且我无法弄清楚如何重写这段代码来做到这一点。

如果我改用 Mono.usingWhen,粘贴的代码会怎样?

【问题讨论】:

是的,我在 GitHub 上得到了答案(你在下面转发了)。感谢转发! 【参考方案1】:

谢谢conniey。发布您的建议作为帮助其他社区成员的答案。

默认情况下receiveMessages() 是一个 Flux,因为我们认为来自会话的消息是“无限长的”。在您的情况下,您只需要流中的第一条消息,因此我们使用 next() 运算符。

倒计时锁存器的使用可能不是最好的方法。在示例中,我们有一个,因此程序在收到消息之前不会结束。 .subscribe 不是阻塞调用,它设置处理程序并移至下一行代码。

Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptSession("greetings-id");
Mono<ServiceBusReceivedMessage> singleMessageMono = Mono.usingWhen(receiverMono,
    receiver -> 
        // Anything you wish to do with the receiver.
        // In this case we only want to take the first message, so we use the "next" operator. This returns a
        // Mono.
        return receiver.receiveMessages().next();
    ,
    receiver -> Mono.fromRunnable(() -> receiver.close()));

try 
    // Turns this into a blocking call. .block() waits indefinitely, so we have a timeout.
    ServiceBusReceivedMessage message = singleMessageMono.block(Duration.ofSeconds(30));
    if (message != null) 
        // Process message.

    
 catch (Exception error) 
    System.err.println("Error occurred: " + error);

可以参考GitHub问题:ServiceBusSessionReceiverAsyncClient - Mono instead of Flux

【讨论】:

以上是关于Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux的主要内容,如果未能解决你的问题,请参考以下文章

Azure.Cosmos 还是 Microsoft.Azure.Cosmos、Azure.Storage.Blob 还是 Microsoft.Azure.Storage.Blob?适用于 .NET C

如何向 Azure 存储授予对 Azure Pipeline 的 Azure 文件副本的访问权限?

Azure手把手系列5:Azure帐户和订阅

Azure Backup 使用Azure恢复服务,备份Azure虚拟机

集成 Azure 服务总线主题和 Azure 函数

从 Azure 函数将消息写入 Azure 服务总线队列