RabbitMQ 请求/响应“RabbitTemplate 未配置为侦听器”

Posted

技术标签:

【中文标题】RabbitMQ 请求/响应“RabbitTemplate 未配置为侦听器”【英文标题】:RabbitMQ request/response "RabbitTemplate is not configured as listener" 【发布时间】:2016-05-11 08:17:11 【问题描述】:

我正在使用 Spring-AMQP rabbitmq 实现测试请求/响应模式,但我无法让它工作......

我已经配置了以下工件:

test_exchange 与问候队列。路由键 = 问候 reply_exchange 与回复队列。路由键 = 回复

@Bean
public ConnectionFactory connectionFactory() 
    CachingConnectionFactory connectionFactory = 
            new CachingConnectionFactory("....IP of broker...");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;


@Bean
public Queue greeting() 
    return new Queue("greeting");


@Bean
public Queue replies() 
    return new Queue("replies");


MessageListener receiver() 
    return new MessageListenerAdapter(new RabbitMqReceiver(), "onMessage");


@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Queue replies) 
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setExchange("test_exchange");
    template.setRoutingKey("greeting");     
    template.setReplyAddress("reply_exchange"+"/"+replies.getName());
    template.setReplyTimeout(60000);
    return template;


@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory, 
        RabbitTemplate rabbitTemplate, Queue replies) 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setMessageListener(rabbitTemplate);
    container.setQueues(replies);
    return container;


@Bean
public SimpleMessageListenerContainer serviceListenerContainer(
        ConnectionFactory connectionFactory) 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(greeting());
    container.setMessageListener(receiver());
    return container;

我在 github 上关注示例,但它崩溃了:

原因:java.lang.IllegalStateException: RabbitTemplate 未配置为 MessageListener - 不能使用“replyAddress”:reply_exchange/replies

文档说:

从 1.5 版开始,RabbitTemplate 将检测它是否已配置为 MessageListener 以接收回复。否则,尝试发送和接收带有回复地址的消息将失败并返回 IllegalStateException(因为永远不会收到回复)。

这很好,但是 RabbitTemplate 是如何做到的呢?它如何检测是否配置为 MessageListener?

提前致谢

PS:发送代码:

public void send() 
    Message message = MessageBuilder.withBody("Payload".getBytes())
            .setContentType("text/plain")
            .build();
    Message reply = this.template.sendAndReceive(message);
    System.out.println("Reply from server is: "+new String(reply.getBody()));

【问题讨论】:

【参考方案1】:

回复容器启动时,检测到模板为ListenerContainerAware,并调用expectedQueueNames()检索回复队列(如果replyAddress的格式为exch/rk,则为null);如果返回非空结果,则容器检查队列是否正确;如果 exch/rk 是回复地址,你会得到这个

logger.debug("Cannot verify reply queue because it has the form 'exchange/routingKey'");

此方法无条件设置 isListener 布尔值,从而避免该异常。所以看起来容器在您发送消息之前还没有启动 - 您是在上下文完全初始化之前发送的吗?

请注意,由于 RabbitMQ 实现了直接回复,因此通常不再需要使用回复容器(除非您想要 HA 回复队列或出于其他原因需要显式回复队列)。直接回复消除了促使我们实现回复容器机制的性能问题。

【讨论】:

【参考方案2】:

Gary,你的直觉一如既往地完美。

我将发送代码更改为:

@SpringBootApplication
public class App 

    @Bean(initMethod="send")
    public RabbitMqSender sender() 
        final RabbitMqSender sender = new RabbitMqSender();
        return sender;
    

    public static void main(String[] args) throws Exception 
        SpringApplication.run(App.class, args);
    

到:

@SpringBootApplication
public class App 
   
public static void main(String[] args) throws Exception 
    final ConfigurableApplicationContext configAppContext =   SpringApplication.run(App.class, args);
    final RabbitMqSender sender = configAppContext.getBean(RabbitMqSender.class);
    sender.send();

确保在所有 bean 都准备好并且请求/响应工作出色时发送!

是的,我也一定会尝试 Direct-TO 模式。

感谢加里的帮助。

问候

托马斯

【讨论】:

以上是关于RabbitMQ 请求/响应“RabbitTemplate 未配置为侦听器”的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点#RabbitMQ示例2:工作队列

如何解决RabbitMQ REST调用415响应问题?

等待 fanout 交换上的所有 rabbitmq 响应?

从 rabbitmq 获取已发布消息的响应。戈朗

RabbitMQ RPC 关闭最终消息的响应队列

RabbitMQ中RPC的实现及其通信机制