@RabbitListener没有从队列接收消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了@RabbitListener没有从队列接收消息相关的知识,希望对你有一定的参考价值。

我正在使用@RabbitListner注释来接收来自RabbitMq队列的消息。

虽然我已经完成了执行此操作所需的所有步骤(即在我的配置类中添加@EnableRabbit注释)并将SimpleRabbitListenerContainerFactory声明为bean,但我的方法仍然没有收到队列中的消息。任何人都可以建议我缺少的东西:

我正在使用Spring Boot来启动我的应用程序

我的启动课

@Configuration
@EnableAutoConfiguration
@EnableRabbit
@EnableConfigurationProperties
@EntityScan("persistence.mysql.domain")
@EnableJpaRepositories("persistence.mysql.dao")
@ComponentScan(excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = ApiAuthenticationFilter.class),@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = ApiVersionValidationFilter.class)},basePackages = {"common", "mqclient","apache", "dispatcher" })
public class Application {

public static void main(final String[] args) {
    final SpringApplicationBuilder appBuilder = new SpringApplicationBuilder(
            Application.class);
    appBuilder.application().setWebEnvironment(false);
    appBuilder.profiles("common", "common_mysql_db", "common_rabbitmq")
            .run(args);
}

@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource primaryDataSource() {
    return DataSourceBuilder.create().build();
}


}

这是我的Bean在组件类中定义SimpleRabbitListenerContainerFactory

@Component(value = "inputQueueManager")
public class InputQueueManagerImpl extends AbstractQueueManagerImpl {

..///..

 @Bean(name = "inputListenerContainerFactory")
 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()     

 {
 SimpleRabbitListenerContainerFactory factory = new    
 SimpleRabbitListenerContainerFactory();
 factory.setConnectionFactory(this.rabbitConnectionFactory);
 factory.setConcurrentConsumers(Integer.parseInt(this.concurrentConsumers));


factory.setMaxConcurrentConsumers(Integer.parseInt(this.maxConcurrentConsumers));
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;

} }

最后我的Listener在另一个Controller组件中

@Controller
public class RabbitListner{
 @RabbitListener(queues = "Storm1", containerFactory = "inputListenerContainerFactory")
 @Override
 public void processMessage(QueueMessage message) {
    String topic = message.getTopic();
    String payload = message.getPayload();
    dispatcher.bean.EventBean eventBean = new dispatcher.bean.EventBean();
    System.out.println("Data read from the queue");

不幸的是,我将消息发送到队列,但processMessage中的代码没有被执行。

我不知道这里有什么问题。任何人都可以帮忙吗?

答案

默认情况下,Json消息转换器需要在消息属性中提示要创建的对象类型。

如果您的生产者没有设置这些属性,那么在没有一些帮助的情况下它将无法进行转换。

你可以将ClassMapper注入转换器。

该框架提供了一个可以自定义的DefaultClassMapper - 要么查看与默认__TypeId__属性不同的消息属性。

如果您始终希望将json转换为同一对象,则只需设置默认类型:

DefaultClassMapper classMapper = newDefaultClassMapper();
classMapper.setDefaultType(QueueMessage.class);
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper);
factory.setMessageConverter(new Jackson2JsonMessageConverter());

documentation already shows how to configure this

以上是关于@RabbitListener没有从队列接收消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP 惰性队列@RabbitListener 并简化直接回复配置

Spring Boot中@RabbitListener注解使用RabbitMQ

RabbitMQ笔记十三:使用@RabbitListener注解消费消息

springboot+RabbitMQ 问题 RabbitListener 动态队列名称:Attribute value must be constant

动态 SendTo 注释

我们如何使用@RabbitListener 连接到消息处理之前/之后