SpringBoot RabbitMq解析框中的json消息自动

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot RabbitMq解析框中的json消息自动相关的知识,希望对你有一定的参考价值。

我有我的springboot应用程序连接到RabbitMQ队列,我按照指定的Json格式推送消息,例如:{"messageId":"de5fe5a3-1831-4b87-891d-e7a4c29295b4","message":{"listingId":"2"},"errors":[]}我确实创建了相应的Java DTO obejct,但我不知道如何将rabbbitMq消息直接解析为java对象。我已经找到了使用Jackson ObjectMapper的方法,但是我希望自己能够独自完成它而不是我

我的DTO

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    final class ListingMessage {
        private final String messageId;
        private final Message message;
        private final List<String> errors;

        @Data
        static final class Message {
            private final String listingId;
        }
    }

我想实现这样的目标:

    @Component
    @Slf4j
    final class ListingMessageListener {

        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

        @RabbitListener(queues = "${rabbitmq.queues}")
        public void receiveMessage(final ListingMessage message) {
                doSomeStuff(message); 
         }
       }

代替

@Component
@Slf4j
final class ListingMessageListener {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @RabbitListener(queues = "${rabbitmq.queues}")
    public void receiveMessage(final Message message) {

        final String json = new String(message.getBody());
        try {
            final ListingMessage listingMessage = OBJECT_MAPPER.readValue(json, ListingMessage.class);
            doSomeStuff(listingMessage);
        }catch(final Exception e){e.printStackTrace();}
     }
   }

编辑。当我添加Jackson2JsonMessageConverter时

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

我有一个例外

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.listingsdatapipeline.rabbitmq.ListingMessageListener.receiveMessage(com.listingsdatapipeline.rabbitmq.ListingMessage)]
Bean [com.listingsdatapipeline.rabbitmq.ListingMessageListener@626df4e5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:191) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.listingsdatapipeline.rabbitmq.ListingMessage] for GenericMessage [payload=byte[167], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=changes-consumer-de, amqp_deliveryTag=1, amqp_consumerQueue=changes-consumer-de, content-type=application/json, amqp_redelivered=false, id=8fb1f2ad-96dd-3d25-c77d-82f741cfa788, amqp_consumerTag=amq.ctag-DfMMARR_KXMXyNJMoEXyAQ, timestamp=1539177836655}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    ... 10 common frames omitted
答案

只需将Jackson2JsonMessageConverter @Bean添加到您的应用程序配置中,Boot将自动配置侦听器容器以使用它;然后@RabbitListener适配器将尝试转换为ListingMessage,因为这是方法参数类型。

以上是关于SpringBoot RabbitMq解析框中的json消息自动的主要内容,如果未能解决你的问题,请参考以下文章

Springboot下的RabbitMQ消息监听源码解读

rabbitmq 在Springboot项目中的运用

rabbitmq 在Springboot项目中的运用

rabbitmq 在Springboot项目中的运用

SpringBoot中使用rabbitmq,activemq消息队列和rest服务的调用

自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展