如何在 Spring Boot 和 RabbitMQ 中配置和接收并将 jSON 有效负载转换为域对象

Posted

技术标签:

【中文标题】如何在 Spring Boot 和 RabbitMQ 中配置和接收并将 jSON 有效负载转换为域对象【英文标题】:How to configure and receiveAndConvert jSON payload into domain Object in Spring Boot and RabbitMQ 【发布时间】:2015-06-18 00:12:57 【问题描述】:

最近我对使用 Spring Boot 的微服务架构产生了浓厚的兴趣。我的实现有两个 Spring Boot 应用程序;

Application One 接收来自 RESTful API 的请求,转换 JSON 负载并将其发送到 RabbitMQ queueA

应用程序二,已订阅 queueA,接收 jSON 有效负载(域对象用户)并应该在应用程序二中激活服务,例如。向用户发送电子邮件。

在我的应用程序二配置中不使用 XML,如何配置一个转换器,将从 RabbitMQ 接收到的 jSON 有效负载转换为域对象用户。

以下是应用程序二上 Spring Boot 配置中的 sn-ps

Application.class

@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner 

    final static String queueName = "user-registration";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AnnotationConfigApplicationContext context;

    @Bean
    Queue queue() 
        return new Queue(queueName, false);
    

    @Bean
    TopicExchange topicExchange() 
        return new TopicExchange("user-registrations");
    

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) 
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    

    public static void main(String[] args) 
        SpringApplication.run(ApplicationInitializer.class, args);
    

    @Override
    public void run(String... args) throws Exception 
        System.out.println("Waiting for messages...");
    


TestService.java

@Component
public class TestService 

    /**
     * This test verifies whether this consumer receives message off the user-registration queue
     */
    @RabbitListener(queues = "user-registration")
    public void testReceiveNewUserNotificationMessage(User user) 
        // do something like, convert payload to domain object user and send email to this user
    


【问题讨论】:

【参考方案1】:

我遇到了同样的问题,经过一些研究和测试后我了解到,在 SpringBoot 中配置 RabbitMQ-Receiver 的方法不止一种,但重要的是选择一种并坚持下去。

如果您决定使用Annotation Driven Listener Endpoint,我从您对@EnableRabbit 和@RabbitListener 的使用中得出什么,而不是您发布的配置对我不起作用。有效的方法如下:

从 org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer 派生您的配置类并覆盖方法 configureRabbitListeners,如下所示:

 @Override
public void configureRabbitListeners(
        RabbitListenerEndpointRegistrar registrar) 
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());

并添加 MessageHandlerFactory:

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() 
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(new MappingJackson2MessageConverter());
    return factory;

此外,您需要定义 SimpleRabbitListenerContainerFactory(就像您已经做过的那样)并自动装配相应的 ConnectionFactory:

@Autowired
public ConnectionFactory connectionFactory;

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    return factory;

完成您的配置后,您需要定义 Bean,它处理您的消息并继承 @RabbitListerner-Annotations。对我来说,我将其命名为 EventResultHandler(您将其命名为 TestService):

    @Bean
public EventResultHandler eventResultHandler() 
    return new EventResultHandler();

然后在您的 EventResultHandler(或 TestService)中定义 @RabbitListener-Methods 及其相应的队列和有效负载(= POJO,您的 JSON 消息被序列化到的位置):

@Component
public class EventResultHandler 

    @RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
    public void handleMessage(@Payload Event event) 
        System.out.println("Event received");
        System.out.println("EventType: " + event.getType().getText());
    

我省略了队列和交换所需的定义和绑定 - 你可以在一个或另一个微服务中执行此操作 - 或在 RabbitMQ-Server 中手动执行......但你肯定必须这样做。

【讨论】:

你先生.....应该获得奖牌。几天前将我的项目父项更新为 Spring Boot 到 1.3.0.RELEASE 后,出于一些有趣的原因,我之前的实现中断了。谢谢【参考方案2】:

创建一个杰克逊消息转换器并将其设置为MessageListenerAdapter#setMessageConverter

@Bean
public MessageConverter jsonMessageConverter() 
    return new Jackson2JsonMessageConverter();

MessageListenerAdapter从哪里来?

【讨论】:

确保您正在导入正确的类以使其正常工作 - 您需要 org.springframework.amqp.support.converter.MessageConverter 而不是 org.springframework.messaging.converter。消息转换器【参考方案3】:

根据 Spring Boot 版本 2.1.4.RELEASE,您可以执行以下操作:

    使用 Jackson 消息转换器声明“装饰”RabbitMq 模板:
    @Bean
    RabbitTemplate rabbitTemplate(RabbitTemplate rabbitTemplate) 
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
           return rabbitTemplate
        
    从队列中读取消息:
        var receievedValie = rabbitTemplate.receiveAndConvert("TestQueue", new ParameterizedTypeReference<Integer>() 
            @Override
            public Type getType() 
                return super.getType();
            
        )

【讨论】:

以上是关于如何在 Spring Boot 和 RabbitMQ 中配置和接收并将 jSON 有效负载转换为域对象的主要内容,如果未能解决你的问题,请参考以下文章

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

在 spring-boot 中如何分离前端和后端机器?

如何在 Spring Boot 和 JUnit 测试中定义 spring.config.location?

如何在 Spring Boot 和 Spring WebFlux 中使用“功能 bean 定义 Kotlin DSL”?

如何隔离spring boot app redis和spring boot session全局redis

spring-boot中如何配置tomcat访问日志的位置和名称?