Spring AMQP - 发送和接收消息

Posted

技术标签:

【中文标题】Spring AMQP - 发送和接收消息【英文标题】:Spring AMQP - Sender and Receiving Messages 【发布时间】:2017-01-19 12:14:03 【问题描述】:

我在接收来自 RabbitMQ 的消息时遇到问题。 我正在发送如下消息

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

如果我们在 RabbitMQ 中看到,我们将得到一个完全限定的类型。

在当前场景中,我们有 n 个生产者用于同一个消费者。如果我使用任何映射器,它会导致异常。 我将如何发送一条消息,使其不包含任何 type_id,并且我可以将消息作为 Message 对象接收,然后我可以将其绑定到接收器中的自定义对象。

我收到如下消息。 您能否让我知道如何使用 Jackson2MessageConverter 以便消息从接收端直接绑定到我的 Object/HashMap。我现在也从发件人那里删除了 Type_ID。

消息在 RabbitMQ 中的外观

优先级:0 交付模式:2 标头:ContentTypeId:java.lang.Object KeyTypeId:java.lang.Object content_encoding:UTF-8 content_type:application/json "Execution_start_time":1473747183636,"status":"SUCCESS"

@Component
public class AdapterOutputHandler 

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message)

        System.out.println("Receiver:::::::::::"+message.toString());

    


连接

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() 
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    

例外

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

我不想使用来自发件人的 __TYPE__ID,因为它们是同一个队列的多个发件人,并且只有一个消费者。

【问题讨论】:

导致异常信息不足。请添加堆栈跟踪 其实 rabbitmq 中的 headers 包含一个名为 type_id_ 的属性。这不应该。 如何发送不存在 type_id_ 属性的消息 priority: 0 delivery_mode: 2 __TypeId__: com.diff.approach.JobListenerDTO** content_encoding: UTF-8 content_type: application/json 【参考方案1】:

导致异常

什么例外?

TypeId:com.diff.approach.JobListenerDTO

这意味着您发送的是 DTO,而不是您在问题中描述的哈希映射。

如果要移除 typeId 标头,可以使用消息后处理器...

rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> 
    m.getMessageProperties.getHeaders().remove("__TypeId__");
    return m;
);

(或, new MessagePostProcessor() ...,如果您不使用 Java 8)。

编辑

您使用的是什么版本的 Spring AMQP?使用 1.6,您甚至不必删除 __TypeId__ 标头 - 框架会查看侦听器参数类型并告诉 Jackson 转换器类型,以便它自动转换为该类型(如果可以的话)。正如你在这里看到的;无需删除类型 ID 即可正常工作...

package com.example;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39443850Application 

    private static final String QUEUE = "so39443850";

    public static void main(String[] args) throws Exception 
        ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
        context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
        context.close();
    

    private final CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
    public void listen(HashMap<String, Object> message) 
        System.out.println(message.getClass() + ":" + message);
        latch.countDown();
    

    @Bean
    public Queue queue() 
        return new Queue(QUEUE);
    

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    

    @Bean
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) 
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    

    public static class DTO 

        private String foo;

        private String baz;

        public DTO(String foo, String baz) 
            this.foo = foo;
            this.baz = baz;
        

        public String getFoo() 
            return this.foo;
        

        public void setFoo(String foo) 
            this.foo = foo;
        

        public String getBaz() 
            return this.baz;
        

        public void setBaz(String baz) 
            this.baz = baz;
        

    


结果:

class java.util.HashMap:foo=baz, baz=qux

这在the documentation...中有所描述。

在 1.6 之前的版本中,转换 JSON 的类型信息必须在消息头中提供,或者需要自定义 ClassMapper。从 1.6 版本开始,如果没有类型信息头,则可以从目标方法参数中推断出类型。

您还可以配置自定义ClassMapper 以始终返回HashMap

【讨论】:

谢谢 Gary,还有一个问题,如何接收此消息? headers: __ContentTypeId__: java.lang.Object __KeyTypeId__: java.lang.Object content_encoding: UTF-8 content_type: application/json 我收到如下消息@RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE) public void handleAdapterQueueMessage(Message message) byte[] body = message.getBody(); 如何从byte[] 转换回hashmap ?? 不要将代码放入 cmets - 它不可读 - 改为编辑您的问题。您需要在侦听器容器工厂中使用 Jackson2JsonMessageConverter 你好,Gary,我已经添加了我的接收代码,你能建议如何使用Jackson2JsonMessageConverter Spring AMQP 版本 - 1.6.1.RELEASE Spring RabbitMQ 版本 - 1.5.6.RELEASE【参考方案2】:

想要在接收消息时使用“一个”不同的 Java 类?

使用自定义 ClassMapper 配置 @Bean Jackson2JsonMessageConverter

想要在接收消息时使用“许多”不同的 Java 类?如:

@MyAmqpMsgListener
void handlerMsg(
        // Main message class, by MessageConverter
        @Payload MyMsg myMsg, 

        // Secondary message class - by MessageConverter->ConversionService
        @Payload Map<String, String> map,

        org.springframework.messaging.Message<MyMsg> msg,
        org.springframework.amqp.core.Message amqpMsg
) 
    // ...

提供一个自定义的@Bean ConverterConversionServiceRabbitListenerAnnotationBeanPostProcessor

@Bean
FormattingConversionServiceFactoryBean rabbitMqCs(
        Set<Converter> converters
) 
    FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
    fac.setConverters(converters);
    return fac;

@Bean
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
        @Qualifier("rabbitMqCs")
        FormattingConversionService rabbitMqCs
) 
    DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
    defaultFactory.setConversionService(rabbitMqCs);
    return defaultFactory;


// copied from RabbitBootstrapConfiguration
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
        MessageHandlerMethodFactory handlerFac
) 
    RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
    bpp.setMessageHandlerMethodFactory(handlerFac);
    return bpp;


@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() 
    return new RabbitListenerEndpointRegistry();

参考资料:

Jackson2JsonMessageConverter AMQP-461 调试源码PayloadArgumentResolver

【讨论】:

以上是关于Spring AMQP - 发送和接收消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP使用noLocal使用者发送和接收临时队列

Spring AMQP项目

在 python 中使用 AMQP 库发送和接收消息

Spring AMQP:由于缺少回复属性,从 POJO 侦听器发送回复失败

用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收

用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收