RabbitMQ-消息可靠性&延迟消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-消息可靠性&延迟消息相关的知识,希望对你有一定的参考价值。

  • 一、MQ常见问题
  • 二、消息可靠性
    1、消息丢失可能发生的节点 2、生产者确认机制 3、消息持久化 4、消费者确认消息 5、失败重试机制
  • 三、死信交换机
    1、死信 2、死信交换机 3、TTL 4、死信交换机&TTL代码实现
  • 四、延迟消息
    1、延迟队列 2、应用场景 3、延迟队列插件安装 4、延迟队列代码实现 5、修改延时消息报异常的处理逻辑

  • 补充:Docker安装RabbitMQ(挂载插件数据卷)


一、MQ常见问题

  • ① 消息可靠性

确保发送的消息至少被消费一次;

  • ② 延迟消息

实现消息的延迟投递;

  • ③ 消息堆积

处理消息无法及时消费的问题;

  • ④ 高可用

避免单点MQ故障导致整体不可用;


二、消息可靠性

1、消息丢失可能发生的节点

【RabbitMQ-消息可靠性&延迟消息】_发送消息


  • ① 发送时丢失

Ⅰ 生产者发送的消息未送达exchange;

Ⅱ 消息到达exchange后未到达queue


  • ② MQ宕机,queue将消息丢失
  • ③ consumer接收到消息后未消费就宕机

2、生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

  • ① ​​publisher-confirm​​​,发送者确认
    Ⅰ 消息成功投递到交换机,返回ack;
  • Ⅱ 消息未投递到交换机,返回nack。
  • ② ​​publisher-return​​,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id​,以区分不同消息,避免ack冲突。

  • ① application.yml配置

Ⅰ ​​publish-confirm-type​​​:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时;
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback;

Ⅱ ​​​publish-returns​​​:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback;

Ⅲ ​​​template.mandatory​​:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。


spring:
rabbitmq:
publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template:
mandatory: true # 定义消息路由失败时的策略。true:则调用ReturnCallback;false:则直接丢弃消息。
  • ② 给RabbitTemplate配置ReturnCallback

注意:每个RabbitTemplate只能配置一个ReturnCallback。

Spring的bean默认为单例,让CommonConfig实现ApplicationContextAware接口,就是为了在Spring准备好容器后给rabbitTemplate对象设置ReturnCallback。


@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware // Spring容器准备好后通知该类
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) ->
// 记录日志
log.info("发送消息到队列失败,应答码,原因,交换机,路由键,消息",
replyCode, replyTest, exchange, routingKey, message.toString());
// 重发消息代码
);

另外,此处省略了重发消息的代码实现,具体可以根据业务需求编写。

  • ③ 代码实现ConfirmCallback

Ⅰ 获取CorrelationData对象,设置全局唯一ID,区分不同消息;

Ⅱ 设置ConfirmCallback的函数实现,消息发送到MQ成功和异常的处理函数;

注意:此处省略了重发消息的代码实现,具体可以根据业务需求编写。


@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException
// 路由键
String routingKey = "simple";
// 消息体
String message = "hello, spring amqp!";
// 消息ID,封装到
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 添加callback
correlationData.getFuture().addCallback(
result ->
// 判断结果
if (result.isAck())
// ack,消息发送成功
log.debug("消息发送到交换机成功,ID:", correlationData.getId());
else
// nack,消息发送失败
log.error("消息发送到交换机失败,ID:,原因", correlationData.getId(), result.getReason());
// 重发消息

,
ex ->
// 记录日志
log.error("消息发送异常,ID:,原因", correlationData.getId(), ex.getMessage());
// 重发消息

);
// 发送消息,此处记得添加correlationData
rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
  • ④ 处理消息确认的情形

Ⅰ publisher-comfirm:
消息成功发送到exchange,返回ack;
消息发送失败,没有到达交换机,返回nack;
消息发送过程中出现异常,没有收到回执。

Ⅱ 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback。

3、消息持久化

  • ① 交换机持久化


@Bean
public DirectExchange simpleExchange()
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.exchange", true, false);

注意:其实直接new也是持久化,默认走如下方法:

【RabbitMQ-消息可靠性&延迟消息】_发送消息_02


  • ② 消息队列持久化


@Bean
public Queue simpleQueue()
// 使用QueueBuilder构建队列,durable持久化
return QueueBuilder.durable("simple.queue").build();

注意:直接new Queue("simple.queue");也是持久化的,如下:

【RabbitMQ-消息可靠性&延迟消息】_持久化_03


  • ③ 消息体持久化


@Test
public void testDurableMsg()
// 路由键
String routingKey = "simple";
// 消息体
String message = "Hello, durable.";
// 消息持久化
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
// 发送消息
rabbitTemplate.convertAndSend("simple.exchange", routingKey, msg);

注意:直接发送普通消息,默认也是持久化的,如下:


public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

4、消费者确认消息

  • ① 消息确认模式

Ⅰ manual:手动ack,需要在业务代码结束后,调用api发送ack;
Ⅱ auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
Ⅲ none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

  • ② 配置文件配置


spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack

5、失败重试机制

  • ① 默认重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环。

  • ② Spring的retry机制

Spring机制重试次数耗尽后,消息会被reject,丢弃。


spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费失败重试
initial-interval: 1000 # 初始失败等待时间为1秒
multiplier: 3 # 下次失败等待时间的倍数,下次等待时长last*multiplier
max-attempts: 3 # 最大重试次数
stateless: true # 无状态;false则为有状态。业务中包含事务则改为false。
  • ③ 失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要由MessageRecoverer接口来处理。

Ⅰ ​​RejectAndDontRequeueRecoverer​​:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
Ⅱ ​​ImmediateRequeueMessageRecoverer​​:重试耗尽后,返回nack,消息重新入队;
Ⅲ ​​RepublishMessageRecoverer​​:重试耗尽后,将失败消息投递到指定的交换机。

覆盖原有策略实现:
Ⅰ 自定义异常消息交换机、队列、绑定:


@Bean
public DirectExchange errorMessageExchange()
return new DirectExchange("error.direct");


@Bean
public Queue errorQueue()
return new Queue("error.queue", true);


@Bean
public Binding errorBinding()
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");

Ⅱ 覆盖原有策略实现:


@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");


三、死信交换机

1、死信

  • ① 消费者使用​​basic.reject​​或​​basic.nack​​声明消费失败,并且消息的requeue参数设置为false;
  • ② 消息是一个过期消息,​​超时​​无人消费;
  • ③ 要投递的队列消息​​堆积满​​了,最早的消息可能成为死信。

2、死信交换机

如果队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey。

3、TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信。

TTL的两种情形:
Ⅰ 消息所在的​​队列​​设置了存活时间;
Ⅱ ​​消息本身​​设置了存活时间。

4、死信交换机&TTL代码实现

  • ① 使用注解的形式声明一组死信交换机、队列,并绑定,如下:


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue", durable = "true"),
exchange = @Exchange(name = "dl.direct"),
key = "dl"
))
public void listenDlQueue(String msg)
log.info("接收到dl.queue的延迟消息:", msg);
  • ② 给队列设置超时时间,在声明队列时配置x-message-ttl属性:


@Bean
public DirectExchange ttlExchange()
return new DirectExchange("ttl.direct");


@Bean
public Queue ttlQueue()
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间为10s
.deadLetterExchange("dl.direct") // 指定死信交换机
.deadLetterRoutingKey("dl") // 指定死信RoutingKey
.build();


@Bean
public Binding ttlBinding()
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("tl");
  • ③ 给TTL队列发送消息


@Test
public void testTTLMsg()
// 路由键
String routingKey = "tl";
// 消息体
String message = "Hello, TTL.";
// 消息持久化
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.setExpiration("5000") // 5s
.build();
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", routingKey, msg);

注意:此处我们设置了queue的超时时间,以及msg的超时时间,最后MQ会以其中较短的时间来实现


四、延迟消息

1、延迟队列

利用TTL结合死信交换机,实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

2、应用场景

  • ① 延迟发送短信;
  • ② 用户下单,如果用户在15 分钟内未支付,则自动取消;
  • ③ 预约工作会议,20分钟后自动通知所有参会人员。

3、延迟队列插件安装

  • ① TTL+死信队列

详见死信交换机章节内容。

  • ② 延迟队列插件

插件安装指南:Scheduling Messages with RabbitMQ

官方插件社区:Community Plugins — RabbitMQ

插件下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

Ⅰ 下载插件
Ⅱ 将插件上传到数据卷目录​​/var/lib/docker/volumes/mq-plugins/_data​​ 数据卷地址查看指令:docker volume inspect mq-plugins
Ⅲ 安装插件
进入容器内部:docker exec -it mq bash
启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

【RabbitMQ-消息可靠性&延迟消息】_发送消息_04


4、延迟队列代码实现

  • ① 基于@RabbitListener的实现

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg)
log.info("接收到delay.queue的延迟消息:", msg);
  • ② 基于Java代码的实现

​delayed() // 设置delay属性为true​


@Bean
public DirectExchange delayedExchange()
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型及名称
.delayed() // 设置delay属性为true
.durable(true) // 持久化
.build();


@Bean
public Queue delayedQueue()
return new Queue("delay.queue");


@Bean
public Binding delayedBinding()
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
  • ③ 向延迟队列发送消息


@Test
public void testDelayedMsg()
// 路由键
String routingKey = "delay";
// 消息体
String message = "Hello, Delay.";
// 消息延迟设置
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
.setHeader("x-delay", 10000)
.build();
// 发送消息
rabbitTemplate.convertAndSend("delay.direct", routingKey, msg);

【RabbitMQ-消息可靠性&延迟消息】_持久化_05


5、修改延时消息报异常的处理逻辑

  • ① ReturnCallback处理逻辑

添加延时时间属性值的判断,该属性大于0则是延迟消息,不报错误提示。


// 判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay() > 0)
// 是一个延迟消息,忽略这个错误提示
return;
  • ② 测试

【RabbitMQ-消息可靠性&延迟消息】_消息发送_06


  • ③ ReturnCallback完整代码


@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware // Spring容器准备好后通知该类

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) ->
// 判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay() > 0)
// 是一个延迟消息,忽略这个错误提示
return;

// 记录日志
log.info("发送消息到队列失败,应答码,原因,交换机,路由键,消息",
replyCode, replyTest, exchange, routingKey, message.toString());
// 重发消息代码
);


补充:Docker安装RabbitMQ(挂载插件数据卷)

  • ① 拉取镜像


docker pull rabbitmq:3.8-management
  • ② 运行容器

​-e RABBITMQ_DEFAULT_USER=test​​设置用户名为test;
​-e RABBITMQ_DEFAULT_PASS=123321​​设置密码为123456;
​-v mq-plugins:/plugins​​挂载数据卷;
​--name mq​​容器名mq;
​--hostname mq1​​主机名mq1;
​-p 15672:15672​​管理界面端口(此处前面的端口是我们设置的,后面的是需要被映射的,下同);
​-p 5672:5672​​MQ端口(内部使用);


docker run \\
-e RABBITMQ_DEFAULT_USER=test \\
-e RABBITMQ_DEFAULT_PASS=123456 \\
-v mq-plugins:/plugins \\
--name mq \\
--hostname mq1 \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3.8-management

五、结尾

以上即为RabbitMQ-消息可靠性&延迟消息的全部内容



RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

系列文章目录

RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式

RabbitMQ:第四章:RabbitMQ集群搭建


文章目录


前言

本文通过实战代码,Spring整合RabbitMQ,项目分二个模块,consumer和produle。


提示:以下是本篇文章正文内容,下面案例可供参考

一、项目代码

1.生产者

1.项目架构图:


代码如下(示例):

2.pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sky</groupId>
    <artifactId>spring-rabbitmq-produle</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.spring-rabbitmq-producer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
    默认交换机类型为direct,名字为:"",路由键为队列的名称
    -->
    <!--
        id:bean的名称
        name:queue的名称
        auto-declare:自动创建
        auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
        durable:是否持久化
    -->

    <rabbit:queue id="spring_queue" name="spring_queue"    auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="spring_fanout_queue_1"  />
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>


    <!-- 定义队列-->
    <rabbit:queue id="spring_direct_queue" name="spring_direct_queue"  auto-declare="true"/>

    <!--
      定义 Routing  路由模式 交互机
    -->
    <rabbit:direct-exchange name="spring_direct_exchange" >
        <rabbit:bindings>
            <!--direct 类型的交换机绑定队列  key :路由key  queue:队列名称-->
            <rabbit:binding queue="spring_direct_queue" key="direct"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one"  auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_three" name="spring_topic_queue_three" auto-declare="true"/>

    <!--
      声明  topic 类型的交换机
    -->
    <rabbit:topic-exchange id="spring_topic_exchange"  name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="one.*"  queue="spring_topic_queue_one"/>
            <rabbit:binding pattern="two.#" queue="spring_topic_queue_two"/>
            <rabbit:binding pattern="three.#" queue="spring_topic_queue_three"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

</beans>

4.rabbitmq.properties:

rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring

说明:这里免费提供rabbitmq连接方式给大家使用学习

5.ProducerTest:

package com.sky.springrabbitmqprodule;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 简单模式发消息
     */
    @Test
    public void testHelloWorld(){
        rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
    }


    /**
     * 广播模式发消息
     */
    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
    }

    /**
     * 路由模式发消息
     */
    @Test
    public void testDirect(){
        rabbitTemplate.convertAndSend("spring_direct_exchange","direct","spring Direct....");
    }

    /**
     * 通配符模式发消息
     */
    @Test
    public void testTopics(){
        rabbitTemplate.convertAndSend("spring_topic_exchange","one.onekey","spring topic one....");
        rabbitTemplate.convertAndSend("spring_topic_exchange","two.twokey.topic","spring topic two....");
        rabbitTemplate.convertAndSend("spring_topic_exchange","three.threekey.topic","spring topic three....");
    }
}

2.消费者

1.项目架构图


代码如下(示例):

2.pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sky</groupId>
    <artifactId>spring-rabbitmq-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

3.spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    以上是关于RabbitMQ-消息可靠性&延迟消息的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列

RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

RabbitMQ实现延迟发送消息

使用RabbitMQ的死信队列实现延迟消息