spring boot rabbitmq MappingJackson2MessageConverter 自定义对象转换

Posted

技术标签:

【中文标题】spring boot rabbitmq MappingJackson2MessageConverter 自定义对象转换【英文标题】:spring boot rabbitmq MappingJackson2MessageConverter custom object conversion 【发布时间】:2015-08-26 13:18:25 【问题描述】:

我正在尝试使用 spring boot 创建一个简单的 spring boot 应用程序,该应用程序将消息“生成”到 rabbitmq 交换/队列和另一个示例 spring boot 应用程序“使用”这些消息。 所以我有两个应用程序(如果你愿意,也可以是微服务)。 1)“生产者”微服务 2)“消费者”微服务

“生产者”有 2 个域对象。 Foo 和 Bar 应该被转换为 json 并发送到 rabbitmq。 “消费者”应分别接收 json 消息并将其转换为域 Foo 和 Bar。 由于某种原因,我无法完成这个简单的任务。这方面的例子不多。 对于消息转换器,我想使用 org.springframework.messaging.converter.MappingJackson2MessageConverter

这是我目前所拥有的:

生产者微服务

package demo.producer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner 

    public static void main(String[] args) 
        SpringApplication.run(ProducerApplication.class, args);
    

    @Bean
    Queue queue() 
        return new Queue("queue", false);
    

    @Bean
    TopicExchange exchange() 
        return new TopicExchange("exchange");
    

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) 
        return BindingBuilder.bind(queue).to(exchange).with("queue");
    

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() 
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception 
        sender.sendToRabbitmq(new Foo(), new Bar());
    


@Service
class Sender 

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmq(final Foo foo, final Bar bar) 

        this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);

    


class Bar 
    public int age = 33;


class Foo 
    public String name = "gustavo";

消费者微服务

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner 

    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class, args);
    

    @Autowired
    private Receiver receiver;

    @Override
    public void run(String... args) throws Exception 

    



@Service
class Receiver 
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) 
        System.out.println("Received <" + foo.name + ">");
    

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) 
        System.out.println("Received <" + bar.age + ">");
    


class Foo 
    public String name;


class Bar 
    public int age;

这是我得到的例外:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers=amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
    ... 12 common frames omitted

异常说没有转换器,这是真的,我的问题是我不知道如何在消费者端设置MappingJackson2MessageConverter转换器(请注意我要使用org.springframework.messaging.converter.MappingJackson2MessageConverter 而不是 org.springframework.amqp.support.converter.JsonMessageConverter)

有什么想法吗?

以防万一,您可以在以下位置创建此示例项目: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

【问题讨论】:

看这里:***.com/questions/29337550/… 在该示例中,它使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter (属于依赖项 spring-amqp),在我的情况下我想使用 org.springframework.messaging.converter.MappingJackson2MessageConverter(属于spring-messaging)。 【参考方案1】:

好的,我终于搞定了。

Spring 使用 PayloadArgumentResolver 来提取、转换并将转换后的消息设置为使用 @RabbitListener 注释的方法参数。不知何故,我们需要将 ma​​ppingJackson2MessageConverter 设置到这个对象中。

因此,在 CONSUMER 应用中,我们需要实现 RabbitListenerConfigurer。通过重写 configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) 我们可以设置一个自定义的 DefaultMessageHandlerMethodFactory,我们为这个工厂设置消息转换器,工厂将创建我们的 PayloadArgumentResolver 使用正确的转换。

这是代码的sn-p,我还更新了git project。

ConsumerApplication.java

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements RabbitListenerConfigurer 

    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class, args);
    

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() 
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() 
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) 
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    

    @Autowired
    private Receiver receiver;



@Service
class Receiver 
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) 
        System.out.println("Received <" + foo.name + ">");
    

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) 
        System.out.println("Received <" + bar.age + ">");
    


class Foo 
    public String name;


class Bar 
    public int age;

因此,如果您运行 Producer 微服务,它将在队列中添加 2 条消息。一个代表 Foo 对象,另一个代表 Bar 对象。 通过运行消费者微服务,您将看到两者都被 Receiver 类中的相应方法使用。


更新问题:

我认为,我认为排队存在概念上的问题。通过声明两个用 @RabbitListener 注释的指向同一个队列的方法,我想要实现的目标是不可能的。上述解决方案无法正常工作。如果你发送给rabbitmq,比如说,6 条Foo 消息和3 条Bar 消息,它们不会被带有Foo 参数的监听器接收6 次。似乎侦听器是并行调用的,因此无法根据方法参数类型区分要调用的侦听器。 我的解决方案(我不确定这是否是最好的方法,我在这里接受建议)是为每个实体创建一个队列。 所以现在,我有了 queue.barqueue.foo,并更新了 @RabbitListener(queues = "queue.foo") 我再次更新了代码,您可以在我的git repository 中查看。

【讨论】:

1.5.0(目前处于里程碑 1)支持 class-level @RabbitListener@RabbitHandler 以支持此用例的各个方法。 如果我使用请求/回复,会得到另一个异常:Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'InvocationResult,我很困惑。【参考方案2】:

我自己没有这样做,但您似乎需要通过设置 RabbitTemplate 来注册适当的转换。请查看this Spring documentation 中的第 3.1.8 节。我知道它是使用 AMQP 类配置的,但是如果您提到的消息传递类是兼容的,则没有理由不能替代它。看起来this reference 解释了如何使用 Java 配置而不是 XML 来完成它。我没有真正使用过 Rabbit,所以我没有任何个人经验,但我很想听听你的发现。

【讨论】:

以上是关于spring boot rabbitmq MappingJackson2MessageConverter 自定义对象转换的主要内容,如果未能解决你的问题,请参考以下文章

spring-boot 集成 rabbitmq

spring-boot 集成 rabbitmq

spring boot 1.5.4 整合rabbitMQ(十七)

Spring Boot 整合 RabbitMQ

Spring Boot项目配置RabbitMQ集群

Spring Boot 整合 RabbitMQ