RabbitMQ:实现消息确认与消息返回

Posted ITKaven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ:实现消息确认与消息返回相关的知识,希望对你有一定的参考价值。

不了解RabbitMQ的消息确认与消息返回可以参考下面两篇博客:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
    </parent>

    <packaging>jar</packaging>

    <groupId>com.kaven</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>springboot</name>
    <description>springboot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven

RabbitMQProperties类(RabbitMQ的参数类):

package com.kaven.springboot.rabbitmq;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties 

    private String host;
    private int port;
    private String username;
    private String password;
    private String exchange;
    private String queue;
    private String routingKey;
    private String virtualHost;

RabbitMQConfirmCallbackAndReturnsCallback类(用于消息确认与消息返回的回调):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQConfirmCallbackAndReturnsCallback implements RabbitTemplate.ConfirmCallback,
        RabbitTemplate.ReturnsCallback 

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) 
        if (ack) 
            System.out.printf("%s 消息成功发送到RabbitMQ\\n", correlationData.getId());
            ReturnedMessage returned = correlationData.getReturned();
            if(returned != null) 
                returnedMessage(returned);
            

         else 
            System.out.printf("%s 消息发送到RabbitMQ失败, %s\\n", correlationData.getId(), cause);
        
    

    @Override
    public void returnedMessage(ReturnedMessage returned) 
        System.out.printf("%s %s %s %s %d\\n",
                returned.getExchange(),
                returned.getRoutingKey(),
                returned.getMessage(),
                returned.getReplyText(),
                returned.getReplyCode()
        );
    

RabbitMQConfig类(定义RabbitMQ组件的配置类):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import javax.annotation.Resource;

@Configuration
public class RabbitMQConfig 

    @Resource
    private RabbitMQProperties properties;

    @Bean
    public ConnectionFactory connectionFactory() 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setVirtualHost(properties.getVirtualHost());
        // 连接工厂开启消息确认和消息返回机制
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         RabbitMQConfirmCallbackAndReturnsCallback confirmCallbackAndReturnsCallback) 
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // RabbitTemplate设置消息确认和消息返回的回调实例
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallbackAndReturnsCallback);
        rabbitTemplate.setReturnsCallback(confirmCallbackAndReturnsCallback);
        return rabbitTemplate;
    

    @Bean
    public DirectExchange exchange() 
        return new DirectExchange(properties.getExchange());
    

    @Bean
    public Queue queue() 
        //队列持久
        return new Queue(properties.getQueue(), true);
    

    @Bean
    public Binding binding() 
        return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
    

Producer类(用于发布消息):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer 

    private final RabbitTemplate rabbitTemplate;

    @Resource
    private RabbitMQProperties properties;

    @Autowired
    public Producer(RabbitTemplate rabbitTemplate) 
        this.rabbitTemplate = rabbitTemplate;
    

    public void sendMsg(String msg, boolean returned) 
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000");
        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
        // 如果returned为true
        // 发送的消息不会路由到队列
        // 因为路由键properties.getRoutingKey() + "-returned"没有绑定的队列
        // 因此会触发消息返回
        if(returned) 
            rabbitTemplate.send(properties.getExchange(),
                    properties.getRoutingKey() + "-returned",
                    message, correlationId);
        
        else 
            rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
        
    

Consumer类(用于消费消息):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer 

    @RabbitListener(queues = "$spring.rabbitmq.queue")
    public void process(String msg) 
        System.out.println("接收消息: " + msg);
    

ProducerController类(用于发布消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController 

    @Resource
    private Producer producer;

    @GetMapping("/send")
    public String send(String msg, boolean returned) 
        producer.sendMsg(msg, returned);
        return "发送消息成功";
    

启动类:

package com.kaven.springboot;

import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;

@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = RabbitMQProperties.class)
public class SpringbootApplication 
    public static void main(String[] args) 
        SpringApplication application = new SpringApplication(SpringbootApplication.class);
        application.run(args);
    

启动应用,Spring Boot会与RabbitMQ建立连接。

请求接口。

控制台输出:

// 消息确认
a9f3afef-72c5-406a-8e75-e9af46a89381 消息成功发送到RabbitMQ
接收消息: "我不会触发消息返回,但可以触发消息确认"

消息成功发送到RabbitMQ触发了消息确认,并且可路由,因此不会触发消息返回。

再次请求接口。

控制台输出:

// 消息返回
"" kaven-returned (Body:'[B@6d598368(byte[53])' MessageProperties [headers=spring_returned_message_correlation=34c045fb-7706-4aca-b775-7befa6c9be01, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=10000, priority=0, deliveryTag=0]) NO_ROUTE 312

// 消息确认
34c045fb-7706-4aca-b775-7befa6c9be01 消息成功发送到RabbitMQ
"" kaven-returned (Body:'[B@6d598368(byte[53])' MessageProperties [headers=spring_listener_return_correlation=2facdfdd-f4ec-4625-ae0b-6f913798eb29, spring_returned_message_correlation=34c045fb-7706-4aca-b775-7befa6c9be01, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=10000, priority=0, redelivered=false, receivedExchange="", receivedRoutingKey=kaven-returned, deliveryTag=0]) NO_ROUTE 312

消息成功发送到RabbitMQ触发了消息确认。

但不可路由,因此也会触发消息返回。

不触发消息确定的情况比较难测试(关掉RabbitMQ服务,Spring Boot会不断尝试重连,请求接口会响应500),这里就不测试了,应该没问题。RabbitMQ整合Spring Boot实现消息确认与消息返回就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

以上是关于RabbitMQ:实现消息确认与消息返回的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:Confirm确认消息 Return返回消息

RabbitMQ:Confirm确认消息 Return返回消息

RabbitMQ confirm的确认监听模式

rabbitmq:消息远程复制(Shovel 插件)

使用rabbitmq手动确认消息的,定时获取队列消息实现

RabbitMQ 消息确认与公平调度消费者