RabbitMQ:MessageConverter消息转换器

Posted ITKaven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:MessageConverter消息转换器相关的知识,希望对你有一定的参考价值。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
    </parent>

    <packaging>jar</packaging>

    <groupId>com.kaven</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>springboot</name>
    <description>springboot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven

RabbitMQProperties类(RabbitMQ的参数类):

package com.kaven.springboot.rabbitmq;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties 

    private String host;
    private int port;
    private String username;
    private String password;
    private String exchange;
    private String queue;
    private String routingKey;
    private String virtualHost;

User类(消息负载的实体类):

package com.kaven.springboot.rabbitmq;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Setter
@Getter
@ToString
@AllArgsConstructor
public class User 
    private String username;
    private String password;
    private String code;

Json2UserMessageConverter类(消息转换器,将json数据转换成User对象,json数据由消息体的byte[]生成):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class Json2UserMessageConverter implements MessageConverter 

    private static final Gson GSON = new Gson();

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException 
        return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
    

    @Override
    public Object fromMessage(Message message) throws MessageConversionException 
        return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
    

CustomizeMessageListener类(自定义消息监听器):

package com.kaven.springboot.rabbitmq;

import org.springframework.stereotype.Component;

@Component
public class CustomizeMessageListener 
    public void customizeHandleMessage(User user) 
        System.out.printf("处理用户数据: %s\\n", user);
    

RabbitMQConfig类(定义RabbitMQ组件的配置类):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import javax.annotation.Resource;

@Configuration
public class RabbitMQConfig 

    @Resource
    private RabbitMQProperties properties;

    @Bean
    public ConnectionFactory connectionFactory() 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setVirtualHost(properties.getVirtualHost());
        return connectionFactory;
    

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
        return new RabbitTemplate(connectionFactory);
    

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
                                                                         CustomizeMessageListener delegate,
                                                                         Json2UserMessageConverter messageConverter) 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        // 设置连接工厂
        container.setConnectionFactory(connectionFactory);
        // 指定要创建的并发消费者数量
        // 默认值为1
        container.setConcurrentConsumers(3);
        // 设置消费者数量的上限
        // 默认为concurrentConsumers
        // 消费者将按需添加
        // 不能小于concurrentConsumers
        container.setMaxConcurrentConsumers(5);
        // 设置要从中接收消息的队列名称
        // 参数为String... queueName
        container.setQueueNames(properties.getQueue());
        // 控制容器在消息确认方面的行为
        // 自动确认
        // 如果手动确认
        // 可以使用ChannelAwareMessageListener消息监听器
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 告诉代理在单个请求中向每个消费者发送多少条消息
        // 通常可以将其设置得相当高以提高吞吐量
        container.setPrefetchCount(3);

        // 创建适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter();
        // 设置委托对象
        adapter.setDelegate(delegate);
        // 设置默认的监听方法名称
        adapter.setDefaultListenerMethod("customizeHandleMessage");
        // 设置消息转换器
        adapter.setMessageConverter(messageConverter);

        // 设置MessageListener(消息监听器)
        container.setMessageListener(adapter);
        return container;
    

    @Bean
    public DirectExchange exchange() 
        return new DirectExchange(properties.getExchange());
    

    @Bean
    public Queue queue() 
        //队列持久
        return new Queue(properties.getQueue(), true);
    

    @Bean
    public Binding binding() 
        return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
    

Producer类(用于发布消息):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer 

    private final RabbitTemplate rabbitTemplate;
    private static final Gson GSON = new Gson();

    @Resource
    private RabbitMQProperties properties;

    @Autowired
    public Producer(RabbitTemplate rabbitTemplate) 
        this.rabbitTemplate = rabbitTemplate;
    

    public void sendMsg(User user) 
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties);
        rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
    

ProducerController类(用于发布消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController 

    @Resource
    private Producer producer;

    @GetMapping("/send")
    public String send(User user) 
        System.out.println(user);
        producer.sendMsg(user);
        return "数据发送成功";
    

启动类:

package com.kaven.springboot;

import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = RabbitMQProperties.class)
public class SpringbootApplication 
    public static void main(String[] args) 
        SpringApplication application = new SpringApplication(SpringbootApplication.class);
        application.run(args);
    

启动应用,使用Postman请求接口。

控制台输出:

User(username=kaven, password=itkaven, code=908767)
处理用户数据: User(username=kaven, password=itkaven, code=908767)

输出符合预期,消息转换器的作用就是当SimpleMessageListenerContainer并发消费时,将获取的消息转换成想要的数据类型(将消息体中的byte[]转换成json,再将json转换成User对象,这里不讨论Json2UserMessageConverter类名是否合适),比如这里的User对象,之后消息监听器接收的就是进行了转换的数据类型,即直接处理User对象,单一职责原则。因此生产者发布的数据也应该从User对象转换成json,最后转换成byte[]类型。MessageConverter消息转换器就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

以上是关于RabbitMQ:MessageConverter消息转换器的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 第五课 RabbitMQ整合Spring AMQP实战

如何在 MappingJackson2MessageConverter 中设置 typeIdPropertyName

Spring整合JMS——MessageConverter介绍

Spring RestTemplate - 选择要使用的 MessageConverter

ActiveMQ之spring集成消息转换器MessageConverter

Spring整合JMS——MessageConverter介绍