为啥在 spring-amqp 中导入 AsyncRabbitTemplate

Posted

技术标签:

【中文标题】为啥在 spring-amqp 中导入 AsyncRabbitTemplate【英文标题】:Why import AsyncRabbitTemplate in spring-amqp为什么在 spring-amqp 中导入 AsyncRabbitTemplate 【发布时间】:2017-01-05 09:13:11 【问题描述】:

在使用 AsyncRabbitTemplate.sendAndReceive() 或 AsyncRabbitTemplate.convertSendAndReceive() 方法处理回复消息时,由于回复消息是与调用方法异步返回的,我们可以使用回复队列的消息监听器来接收和处理回复消息,为什么 spring -amqp 框架导入AsyncRabbitTemplate 和RabbiteMessageFuture 来处理回复消息?对于消息监听器,我们可以控制相关的消费者线程, 但是对于 RabbitMessageFuture,后台线程是无法管理的。

--------------------添加于 2017/01/06------------------ -------

这只是你的选择。

回复可以以不同的顺序返回以发送。

使用异步模板,框架负责相关性 为您回复将出现在发送返回的未来 方法。

当您使用自己的监听器时,您必须注意 自己关联。


谢谢。我知道这个区别。但仍然存在问题。如果我使用消息监听器,我可以手动确认回复消息(如果我的消息监听器 实现ChannelAwareMessageListener接口,我可以得到通道实例)。但是当我使用asyncRabbitTemplate时,我可以手动确认回复消息吗?似乎 sendAndReceive 方法会自动确认回复消息。

我不明白你的意思;因为你可以注入监听器 将容器放入模板中,无论哪种方式,您都有相同的“控制”。


这个意思好像有问题。

我创建了一个 rabbitTemplate 实例和简单的消息侦听器容器。但是当我使用它们来构造一个 asyncRabbitTemplate 实例时,代码如下:

@Bean(name="rabbitTemplate")
public RabbitTemplate getRabbitTemplate()

        RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
        rabbitTemplate.setUseTemporaryReplyQueues(false);        
        rabbitTemplate.setReplyAddress("replyQueue");
        rabbitTemplate.setReceiveTimeout(60000);
        rabbitTemplate.setReplyTimeout(60000);
        return rabbitTemplate;


@Bean(name="asyncRabbitTemplate")
public AsyncRabbitTemplate getAsyncRabbitTemplate()

  AsyncRabbitTemplate asyncRabbitTemplate =
    new AsyncRabbitTemplate(getRabbitTemplate(), createReplyListenerContainer());
  asyncRabbitTemplate.setAutoStartup(true);
  asyncRabbitTemplate.setReceiveTimeout(60000);
  return asyncRabbitTemplate;


@Bean(name="replyMessageListenerContainer")
public SimpleMessageListenerContainer createReplyListenerContainer() 
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(getConnectionFactory());
        listenerContainer.setQueueNames("replyQueue");      
        listenerContainer.setMessageListener(getRabbitTemplate());
        listenerContainer.setRabbitAdmin(getRabbitAdmin());
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return listenerContainer;

我发现我无法成功发送消息。消费端服务器收不到消息。

但是当我使用以下代码创建asyncRabbitTemplate 实例时,我发现消息可以成功发送和接收。

@Bean(name="asyncRabbitTemplate")
public AsyncRabbitTemplate getAsyncRabbitTemplate()

   AsyncRabbitTemplate asyncRabbitTemplate =
            new AsyncRabbitTemplate(getConnectionFactory(),
                        "sendMessageExchange",
                        "sendMessageKey",
                        "replyQueue");
  asyncRabbitTemplate.setReceiveTimeout(60000);
  asyncRabbitTemplate.setAutoStartup(true);
  return asyncRabbitTemplate;

如果我的源代码有问题?

我使用的是 spring-boot-ampq 1.4.3.RELEASE。

【问题讨论】:

【参考方案1】:

这只是你的选择。

回复可以以不同的顺序返回以发送。

使用异步模板,框架会为您处理相关性 - 回复将出现在发送方法返回的未来。

当您使用自己的侦听器时,您必须自己处理相关性。

对于message listener,我们可以控制相关的consumer线程,但是对于RabbitMessageFuture,后台线程无法管理。

我不明白你的意思;由于您可以将侦听器容器注入到模板中,因此无论哪种方式,您都拥有相同的“控制”。

编辑

@SpringBootApplication
public class So41481046Application 

    public static void main(String[] args) throws Exception 
        ConfigurableApplicationContext context = SpringApplication.run(So41481046Application.class, args);
        AsyncRabbitTemplate asyncTemplate = context.getBean(AsyncRabbitTemplate.class);
        RabbitConverterFuture<String> future = asyncTemplate.convertSendAndReceive("foo");
        try 
            String out = future.get(10, TimeUnit.SECONDS);
            System.out.println(out);
        
        finally 
            context.close();
        
        System.exit(0);
    

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) 
        rabbitTemplate.setRoutingKey(queue().getName());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer(connectionFactory));
    

    @Bean
    public Queue queue() 
        return new AnonymousQueue();
    

    @Bean
    public Queue replyQueue() 
        return new AnonymousQueue();
    

    @Bean
    public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(replyQueue().getName());
        return container;
    

    @Bean
    public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(queue().getName());
        container.setMessageListener(new MessageListenerAdapter(new Object() 

            @SuppressWarnings("unused")
            public String handleMessage(String in) 
                return in.toUpperCase();
            

        ));
        return container;
    


【讨论】:

亲爱的 Gary,我在原来的问题中添加了一些新问题。你能回答他们吗?谢谢。 我在使用外部容器时没有遇到任何问题 - 请参阅我的编辑 - 但是,回复确实只支持自动确认, 感谢gary,我会试试你的源代码并与我的源代码进行比较。如果有什么问题,我会再问你。

以上是关于为啥在 spring-amqp 中导入 AsyncRabbitTemplate的主要内容,如果未能解决你的问题,请参考以下文章

为啥在 TypeScript 中导入 CSS 不起作用?

为啥我不能在 Django 项目中导入模块?

为啥在 sagemaker 笔记本中导入 SparkContext 库时出现错误?

为啥我不能在 react native 中导入我的自定义组件

为啥我不能在 Spring-5.0.0.RELEASE 中导入 ApplicationContext

为啥在 phpmyadmin 中导入的数据库更小?