Spring和SpringBoot整合RabbitMQ

Posted Lossdate

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring和SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。

一、Spring整合RabbitMQ

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.2.7.RELEASE</version>
    </dependency>
</dependencies>

1. Producer

1.1 Config

@Configuration
public class RabbitConfig {

    /**
     * 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
    }

    /**
     * RabbitTemplate
     */
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    /**
     * RabbitAdmin
     */
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    /**
     * Queue
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("queue.anno").build();
    }

    /**
     * Exchange
     */
    @Bean
    public Exchange fanoutExchange() {
        return new FanoutExchange("ex.anno.fanout", false, false, null);
    }

    /**
     * Binding
     */
    @Bean
    @Autowired
    public Binding binding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key,anno").noargs();
    }

}

1.1 Producer

public class ProducerApp {
    public static void main(String[] args) throws Exception {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);

        RabbitTemplate template = context.getBean(RabbitTemplate.class);

        MessageProperties messageProperties = MessagePropertiesBuilder
                .newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("gbk")
                .setHeader("myKey", "myValue")
                .build();

        Message msg = MessageBuilder.withBody("Hello World".getBytes("gbk"))
                .andProperties(messageProperties)
                .build();

        template.send("ex.anno.fanout", "key.anno", msg);

        context.close();
    }
}

2. Consumer拉取推送消息

2.1 Config

@Configuration
public class RabbitConfig {

    /**
     * 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
    }

    /**
     * RabbitTemplate
     */
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    /**
     * RabbitAdmin
     */
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    /**
     * Queue
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("queue.anno").build();
    }

}

2.2 Consumer

public class ConsumerApp {
    public static void main(String[] args) throws Exception {
        AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);

        RabbitTemplate template = context.getBean(RabbitTemplate.class);

        Message receive = template.receive("queue.anno");

        System.out.println(new String(receive.getBody(), receive.getMessageProperties().getContentEncoding()));

        context.close();
    }
}

3. Consumer消息监听(用于推消息)

3.1 Config

@ComponentScan("demo") //扫描包
@Configuration
@EnableRabbit
public class RabbitConfig {

    /**
     * 连接工厂
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(URI.create("amqp://root:123456@192.168.200.136:5672/%2f"));
    }

    /**
     * RabbitTemplate
     */
    @Bean
    @Autowired
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    /**
     * RabbitAdmin
     */
    @Bean
    @Autowired
    public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    /**
     * Queue
     */
    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("queue.anno").build();
    }

    /**
     * SimpleRabbitListenerContainerFactory
     */
    @Bean("rabbitListenerContainerFactory")
    @Autowired
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        //最少允许多少个并发的消费者
        factory.setConcurrentConsumers(10);
        //最大允许多少个并发的消费者
        factory.setMaxConcurrentConsumers(15);
        //按照批次消费消息,一个批次多少个
        factory.setBatchSize(10);

        return factory;
    }
}

3.2 MessageListener

@Component
public class MyMessageListener {

    /**
     * RabbitListener : 监听消息队列
     * com.rabbitmq.client.Channel channel对象
     * org.springframework.amqp.core.Message message对象 可以直接操作原生的AMQP消息
     * org.springframework.messaging.Message to use the messaging abstraction counterpart
     * @Payload 注解方法参数,改参数的值就是消息体
     * @Header 注解方法参数,访问指定的消息头字段的值
     * @Headers 该注解的方法参数获取该消息的消息头的所有字段,参数类型对应于map集合。
     * MessageHeaders 参数类型,访问所有消息头字段
     * MessageHeaderAccessor or AmqpMessageHeaderAccessor 访问所有消息头字段
     */
//    @RabbitListener(queues = "queue.anno")
//    public void onMessage(Message message) {
//        System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
//    }

    @RabbitListener(queues = "queue.anno")
    public void onMessage(@Payload String messageStr) {
        System.out.println(messageStr);
    }
}

3.3 Consumer

public class ConsumerListenerApp {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(RabbitConfig.class);
    }
}

二、SpringBoot整合RabbitMQ

1. Producer

1.1 构建Springboot 2.5.0 + Spring for RabbitMQ + Spring Web

1.2 application.properties

spring.application.name=springboot_rabbit
spring.rabbitmq.host=192.168.200.136
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

1.3 Config

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("queue.boot", false, false, false, null);
    }

    @Bean
    public Exchange exchange() {
//        new Exchange()
//        return new TopicExchange("topic.ex", false, false, null);
//        return new DirectExchange("direct.ex", false, false, null);
//        return new FanoutExchange("fanout.ex", false, false, null);
//        return new HeadersExchange("header.ex", false, false, null);
        //交换器名称,交换器类型(),是否是持久化的,是否自动删除,交换器属性Map集合
//        return new CustomExchange("custom.ex", ExchangeTypes.DIRECT, false, false, null);
        return new TopicExchange("ex.boot", false, false, null);
    }

    @Bean
    public Binding binding() {
        //绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key,绑定的属性
//        new Binding("", Binding.DestinationType.EXCHANGE, "", "", null);
        //绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key,绑定的属性
//        new Binding("", Binding.DestinationType.QUEUE, "", "", null);
        //绑定了交换器ex.boot到队列queue.boot,路由key是key.boot
        return new Binding("queue.boot",
                Binding.DestinationType.QUEUE,
                "ex.boot",
                "key.boot",
                null);
    }

}

1.4 Controller

@RestController
public class MessageController {

    private final AmqpTemplate rabbitTemplate;

    @Autowired
    public MessageController(AmqpTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RequestMapping("/rabbit/{message}")
    public String receive(@PathVariable String message) {

        MessageProperties messageProperties = MessagePropertiesBuilder
                .newInstance()
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setContentEncoding("utf-8")
                .setHeader("Hello", "World")
                .build();

        Message msg = MessageBuilder
                .withBody(message.getBytes(StandardCharsets.UTF_8))
                .andProperties(messageProperties)
                .build();

        //exchange + routing key + msg
        rabbitTemplate.send("ex.boot", "key.boot", msg);

        return "ok";
    }
}

2. Consumer(监听模式)

2.1 构建Springboot 2.5.0 + Spring for RabbitMQ

2.2 application.properties

spring.application.name=springboot_rabbit_consumer
spring.rabbitmq.host=192.168.200.136
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

2.3 Config

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return QueueBuilder.nonDurable("queue.boot").build();
    }

}

2.4 ConsumerListener

@Component
public class MyMessageListener {

    @RabbitListener(queues = "queue.boot")
    public void getMyMessage(@Payload String messageStr, 
                             @Header(name = "Hello") String headerValue) {
        System.out.println(messageStr);
        System.out.println("Hello -> " + headerValue);
    }

}

3. 测试

3.1 启动Consumer和Producer

3.2 访问http://localhost:8080/rabbit/testMsg

可以看到Consumer控制台输出:
testMsg
Hello -> World

以上是关于Spring和SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ

springboot整合rabbit,支持消息确认机制

Rabbit MQ和Spring Boot的整合

Spring Cloud Stream整合Rabbit之重复投递

Spring Cloud Stream整合Rabbit之重复投递

如何在 Spring Boot 测试中模拟 Spring amqp/rabbit