Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux
Posted
技术标签:
【中文标题】Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux【英文标题】:Azure ServiceBusSessionReceiverAsyncClient - Mono instead of Flux 【发布时间】:2022-01-05 09:46:05 【问题描述】:我有一个 Spring Boot 应用,我从 Azure 服务总线队列会话中收到一条消息。
代码是:
@Autowired
ServiceBusSessionReceiverAsyncClient apiMessageQueueIntegrator;
.
.
.
Mono<ServiceBusReceiverAsyncClient> receiverMono = apiMessageQueueIntegrator.acceptSession(sessionid);
Disposable subscription = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(() -> receiver.close()))
.subscribe(message ->
// Process message.
logger.info(String.format("Message received from quque. Session id: %s. Contents: %s%n", message.getSessionId(),
message.getBody()));
receivedMessage.setReceivedMessage(message);
timeoutCheck.countDown();
, error ->
logger.info("Queue error occurred: " + error);
);
由于我只收到来自会话的一条消息,因此我在收到消息后使用 CountDownLatch(1) 处理订阅。
库的文档说,如果我只期望一条消息,则可以使用 Mono.usingWhen 而不是 Flux.usingWhen,但我无法在任何地方找到任何示例,而且我无法弄清楚如何重写这段代码来做到这一点。
如果我改用 Mono.usingWhen,粘贴的代码会怎样?
【问题讨论】:
是的,我在 GitHub 上得到了答案(你在下面转发了)。感谢转发! 【参考方案1】:谢谢conniey。发布您的建议作为帮助其他社区成员的答案。
默认情况下
receiveMessages()
是一个 Flux,因为我们认为来自会话的消息是“无限长的”。在您的情况下,您只需要流中的第一条消息,因此我们使用next()
运算符。
倒计时锁存器的使用可能不是最好的方法。在示例中,我们有一个,因此程序在收到消息之前不会结束。
.subscribe
不是阻塞调用,它设置处理程序并移至下一行代码。
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptSession("greetings-id");
Mono<ServiceBusReceivedMessage> singleMessageMono = Mono.usingWhen(receiverMono,
receiver ->
// Anything you wish to do with the receiver.
// In this case we only want to take the first message, so we use the "next" operator. This returns a
// Mono.
return receiver.receiveMessages().next();
,
receiver -> Mono.fromRunnable(() -> receiver.close()));
try
// Turns this into a blocking call. .block() waits indefinitely, so we have a timeout.
ServiceBusReceivedMessage message = singleMessageMono.block(Duration.ofSeconds(30));
if (message != null)
// Process message.
catch (Exception error)
System.err.println("Error occurred: " + error);
可以参考GitHub问题:ServiceBusSessionReceiverAsyncClient - Mono instead of Flux
【讨论】:
以上是关于Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux的主要内容,如果未能解决你的问题,请参考以下文章
Azure.Cosmos 还是 Microsoft.Azure.Cosmos、Azure.Storage.Blob 还是 Microsoft.Azure.Storage.Blob?适用于 .NET C
如何向 Azure 存储授予对 Azure Pipeline 的 Azure 文件副本的访问权限?