Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置
Posted
技术标签:
【中文标题】Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置【英文标题】:Spring AMQP lazy queue @RabbitListener and simplify direct reply configuration 【发布时间】:2021-10-08 07:31:56 【问题描述】:目前我遇到以下问题: 外部 RabbitMQ 远程服务器首先希望我收到一条异步回复登录消息,然后此远程服务器创建一个广播队列 broadcastQueue.MYUSER_123。从这里我想通过 @RabbitListener 注释来消费,这在代码示例中可以看到。但是我遇到了一个错误,你可以在下面看到。
在调试时,我注意到一个容器在我在代码中执行登录之前启动了这个监听器,因此在连接这个广播队列时它会被拒绝。我在How to Lazy Load RabbitMQ queue using @Lazy in spring boot? 中找到了帖子,但我这边的问题是,自动启动不起作用,并且为侦听器启动了容器。我做错了什么?是否可以创建惰性队列?
还有一点是: 有没有一种简单的方法可以为 RabbitMQ 模板进行直接回复配置,我可以在其中指定回复地址?根据spring amqp代码看来,如果您自己不指定回复地址,您似乎只能进行直接回复连接。所以我需要创建一个自定义容器。
感谢您的帮助。
亲切的问候 斯文
@SpringBootTest
@Slf4j
class DemoAmqpApplicationTests
@RabbitListener(queues="broadcastQueue.MYUSER_123",autoStartup = "false")
public void handleMessage(Object obj)
log.info("",obj);
@Test
void contextLoads() throws InterruptedException
Message<User> login = login();
配置
@Configuration
@Slf4j
@EnableRabbit
public class RabbitMqConfiguration
@Bean
public RabbitAdmin amqpAdmin(RabbitTemplate rabbitTemplate)
return new RabbitAdmin(rabbitTemplate);
@Bean
public RabbitTemplate rabbitTemplate(@NonNull CachingConnectionFactory connectionFactory,
@NonNull MessageConverter messageConverter,
@NonNull Queue inquiryResponseQueue,
@NonNull RabbitTemplateConfigurer configurer)
RabbitTemplate rabbitTemplate = new RabbitTemplate();
configurer.configure(rabbitTemplate, connectionFactory);
String username = connectionFactory.getUsername();
configurePostReceiveProcessor(rabbitTemplate);
rabbitTemplate.setMessageConverter(messageConverter);
configurePrepareSendingProcessor(rabbitTemplate, username, inquiryResponseQueue.getName());
configureReply(rabbitTemplate, inquiryResponseQueue);
return rabbitTemplate;
private void configureReply(RabbitTemplate rabbitTemplate,
@NonNull Queue inquiryResponseQueue)
rabbitTemplate.setReplyAddress(inquiryResponseQueue.getName());
rabbitTemplate.setDefaultReceiveQueue(inquiryResponseQueue.getName());
@Bean
public SimpleMessageListenerContainer replyListenerContainer(
@NonNull CachingConnectionFactory connectionFactory,
@NonNull List<Queue> inquiryResponseQueue,
@NonNull RabbitTemplate rabbitTemplate,
@NonNull RabbitAdmin rabbitAdmin)
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(inquiryResponseQueue.get(0));
container.setMessageListener(rabbitTemplate);
return container;
@Bean
public Queue privateInquiryResponseQueue(
@NonNull CachingConnectionFactory connectionFactory)
return new Queue(privateInquiryResponseQueueName(connectionFactory.getUsername()),
false,
true,
true);
错误日志:
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:721) ~[spring-rabbit-2.3.10.jar:2.3.10]
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'broadcastQueue.MYUSER_123' in vhost 'app', class-id=50, method-id=10)
【问题讨论】:
【参考方案1】:autoStartup
工作正常 - 必须有其他东西启动容器 - 添加断点以查看启动它的内容 - 您是否在应用程序上下文中调用 start()
?这将启动容器。
Direct reply-to 是一种特殊的 RabbitMQ 模式,它使用伪队列进行回复;对于命名的回复队列,您需要一个侦听器容器。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to
【讨论】:
以上是关于Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置的主要内容,如果未能解决你的问题,请参考以下文章
消息队列客户端开发向导二(基于 Spring 的 amqp 实现)
消息队列客户端开发向导二(基于 Spring 的 amqp 实现)