Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置

Posted

技术标签:

【中文标题】Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置【英文标题】:Spring AMQP lazy queue @RabbitListener and simplify direct reply configuration 【发布时间】:2021-10-08 07:31:56 【问题描述】:

目前我遇到以下问题: 外部 RabbitMQ 远程服务器首先希望我收到一条异步回复登录消息,然后此远程服务器创建一个广播队列 broadcastQueue.MYUSER_123。从这里我想通过 @RabbitListener 注释来消费,这在代码示例中可以看到。但是我遇到了一个错误,你可以在下面看到。

在调试时,我注意到一个容器在我在代码中执行登录之前启动了这个监听器,因此在连接这个广播队列时它会被拒绝。我在How to Lazy Load RabbitMQ queue using @Lazy in spring boot? 中找到了帖子,但我这边的问题是,自动启动不起作用,并且为侦听器启动了容器。我做错了什么?是否可以创建惰性队列?

还有一点是: 有没有一种简单的方法可以为 RabbitMQ 模板进行直接回复配置,我可以在其中指定回复地址?根据spring amqp代码看来,如果您自己不指定回复地址,您似乎只能进行直接回复连接。所以我需要创建一个自定义容器。

感谢您的帮助。

亲切的问候 斯文

@SpringBootTest
@Slf4j
class DemoAmqpApplicationTests 

    @RabbitListener(queues="broadcastQueue.MYUSER_123",autoStartup = "false")
    public void handleMessage(Object obj) 
        log.info("",obj);

    

    @Test
    void contextLoads() throws InterruptedException 
        Message<User> login = login();
                

    

配置

@Configuration
@Slf4j
@EnableRabbit
public class RabbitMqConfiguration 

    @Bean
    public RabbitAdmin amqpAdmin(RabbitTemplate rabbitTemplate) 
        return new RabbitAdmin(rabbitTemplate);
    

    @Bean
    public RabbitTemplate rabbitTemplate(@NonNull CachingConnectionFactory connectionFactory,
                                         @NonNull MessageConverter messageConverter,
                                         @NonNull Queue inquiryResponseQueue,
                                         @NonNull RabbitTemplateConfigurer configurer) 

        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        configurer.configure(rabbitTemplate, connectionFactory);

        String username = connectionFactory.getUsername();

        configurePostReceiveProcessor(rabbitTemplate);

        rabbitTemplate.setMessageConverter(messageConverter);

        configurePrepareSendingProcessor(rabbitTemplate, username, inquiryResponseQueue.getName());

        configureReply(rabbitTemplate, inquiryResponseQueue);

        return rabbitTemplate;
    

    private void configureReply(RabbitTemplate rabbitTemplate,
                             
                                @NonNull Queue inquiryResponseQueue) 
        rabbitTemplate.setReplyAddress(inquiryResponseQueue.getName());
        rabbitTemplate.setDefaultReceiveQueue(inquiryResponseQueue.getName());
    

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer(
            @NonNull CachingConnectionFactory connectionFactory,
            @NonNull List<Queue> inquiryResponseQueue,
            @NonNull RabbitTemplate rabbitTemplate,
            @NonNull RabbitAdmin rabbitAdmin) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(inquiryResponseQueue.get(0));
        container.setMessageListener(rabbitTemplate);
        return container;
    

    @Bean
    public Queue privateInquiryResponseQueue(
            @NonNull CachingConnectionFactory connectionFactory) 
        return new Queue(privateInquiryResponseQueueName(connectionFactory.getUsername()),
                false,
                true,
                true);
    

    



错误日志:

    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:721) ~[spring-rabbit-2.3.10.jar:2.3.10]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'broadcastQueue.MYUSER_123' in vhost 'app', class-id=50, method-id=10)

【问题讨论】:

【参考方案1】:

autoStartup 工作正常 - 必须有其他东西启动容器 - 添加断点以查看启动它的内容 - 您是否在应用程序上下文中调用 start()?这将启动容器。

Direct reply-to 是一种特殊的 RabbitMQ 模式,它使用伪队列进行回复;对于命名的回复队列,您需要一个侦听器容器。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to

【讨论】:

以上是关于Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置的主要内容,如果未能解决你的问题,请参考以下文章

Spring使用Spring和AMQP发送接收消息(上)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

Spring AMQP Java 客户端中的队列大小

Spring AMQP 使用 noLocal 消费者使用临时队列发送和接收

Spring AMQP使用noLocal使用者发送和接收临时队列