RabbitMQ消息队列笔记

Posted 知道什么是码怪吗?

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息队列笔记相关的知识,希望对你有一定的参考价值。

目录

交换机

扇出(fanout)

测试

 直接(direct)

测试

 主题(topic)

测试

死信队列

消息TTL过期测试

队列达到最大长度测试

消息被拒绝测试

延迟队列

延迟队列测试

延迟队列优化 

插件实现延迟队列

安装插件

测试插件


交换机

RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者只能将消息发送到交换机(exchange)。

交换机工作内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机的类型决定了它们如何处理收到的消息。

交换机的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)

第一个参数表示交换机的名称,空串表示使用默认交换机。 

channel.basiPublish("","hello",null,message.getBytes());

绑定:交换机和信道之间的桥梁,让交换机知道和哪个队列进行了绑定,可通过绑定选择性的发送消息。

扇出(fanout)

发布订阅模式:生产者将信息发送到交换机,交换机再将信息发送到其他队列当中。

测试

定义交换机并发送信息

//定义交换机并发送信息
public class Emitlog 
    // 交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) 
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出的消息:" + message);
        
    

消息接收端 

这里的消息接收端需要开启两个,复制下面的代码再新建一个类,修改输出信息即可。

//消息接收
public class ReceiveLogs02 

    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个队列,名称随机,当消费者断开与队列的连接时,队列自动删除
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机与队列
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("等待接受消息,把接受到的消息打印在屏幕上...");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("ReceiveLogs02控制台打印接受到的消息:" + new String(message.getBody()));
        ;
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> 
        );
    

测试结果

两个信道订阅了同一个交换机,都接收到了信息。

 直接(direct)

队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

绑定之后交换机可通过关键字的不同,选择发送到不同的队列当中。

值得注意的是,可以一个key对应一个队列,也可以一个key对应多个队列。一个关键字绑定多个队列效果和扇出效果类似。没有绑定关系的信息将会被丢弃。

测试

生产者

public class DirectLogs 
    // 交换机的名称
    public static final String EXCHANGE_NAME = "direct_logs";
    public static final String[] choice = new String[]"error", "info", "warning";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) 
            String message = scanner.next();
            String way = scanner.next();
            scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, way, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出的消息:" + message);
        
    

消费者 1

public class ReceiveLogsDirect01 
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列
        channel.queueDeclare("console", false, false, false, null);
        //绑定交换机与队列
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("ReceiveLogsDirect01控制台打印接受到的消息:" + new String(message.getBody()));
        ;
        channel.basicConsume("console", true, deliverCallback, consumerTag -> 
        );
    

消费者2 

public class ReceiveLogsDirect02 
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个队列
        channel.queueDeclare("disk", false, false, false, null);
        //绑定交换机与队列
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("ReceiveLogsDirect02控制台打印接受到的消息:" + new String(message.getBody()));
        ;
        channel.basicConsume("disk", true, deliverCallback, consumerTag -> 
        );
    

测试结果

信息被发送到了不同的队列当中。

 主题(topic)

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse" , "nyse.vmw" , "quick.orange.rabbit" 这种类型的。当然这个单词列表最多不能超过255个字节。

其中可以使用下面两个替换符来进行单词的替代:

* 代替一个单词

# 代替零个或多个单词

匹配案例

*.a.*:三个单词的字符串中,中间单词为 a 即匹配。

#.a:最后一个单词是 a 即匹配。

测试

生产者

public class EmitLogTopic 
    //交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();

        HashMap<String, String> map = new HashMap<>();
        map.put("quick.orange.rabbit", "被队列Q1Q2接收到");
        map.put("quick.orange.fox", "被队列Q1接收到");
        map.put("lazy.brown.fox", "被队列Q2接收到 ");
        map.put("lazy.pink.rabbit", "虽然满足队列Q2的两个绑定但是只会被接收一次");
        map.put("quick.orange.male.rabbit", "四个单词不匹配任何绑定会被丢弃");

        for (Map.Entry<String, String> bindingKeyEntry : map.entrySet()) 
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:" + message);
        
    

消费者1

public class ReceiveLogsTopic01 
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        //队列捆绑
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println(new String(message.getBody()));
            System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
        ;
        //接收消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> 
        );
    

消费者2

public class ReceiveLogsTopic02 
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        //队列捆绑
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "*lazy.#");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println(new String(message.getBody()));
            System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
        ;
        //接收消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> 
        );
    

测试结果

先启动两个消费者,然后再启动生产者。

死信队列

死信:顾名思义就是无法被消费的消息。某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

死信来源:消息过期、消息队列已满无法添加新数据、消息被拒绝了并且没有再次将消息放回队列当中。

消息TTL过期测试

生产者

public class Producer 

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();

        //死信消息,设置TTL时间  单位是ms  10000ms是10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) 
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
        
    

普通消费者1

public class Consumer01 

    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通的交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        HashMap<String, Object> arguments = new HashMap<>();
        //过期时间
        arguments.put("x-message-ttl", 10000);
        //正常队列设置死信队列
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //声明死信和普通队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));
        ;
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> 
        );
    

死信消费者2

public class Consumer02 

    //死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        ;
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> 
        );
    

测试结果

先启动普通消费者1,然后关闭普通消费者,启动生产者发送数据,在web网页查看。

 过了10秒钟之后,由于普通队列信息超时,信息被送往了死信队列。

这个时候打开死信消费者2,接收信息。 

队列达到最大长度测试

生产者

public class Producer 

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        for (int i = 0; i < 10; i++) 
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8));
        
    

普通消费者1中增加一行代码设置队列长度

arguments.put("x-max-length", 6);

死信消费者2代码不变

测试结果 

首先在web页面删除普通队列,因为参数以及改变了。再次启动普通消费者,然后关闭。启动生产者。

消息被拒绝测试

生产者和测试队列最大长度代码相同,普通消费者修改如下

public class Consumer01 

    //普通交换机名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通的交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明普通队列
        HashMap<String, Object> arguments = new HashMap<>();
        //正常队列设置死信队列
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //声明死信和普通队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = (consumerTag, message) -> 
            String msg = new String(message.getBody());
            if (msg.equals(("info5"))) 
                System.out.println("拒绝此消息:" + msg);
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
             else 
                System.out.println("Consumer01接收的消息是:" + new String(message.getBody()));
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            
        ;
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> 
        );
    

死信消费者代码不变

测试结果

首先在web管理页面删除普通信道,然后开启普通消费者1,最后开启生产者。

延迟队列

延迟队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

在springBoot中集成rabbitMQ,注意springBoot版本不宜过高,推荐2.5.0。

延迟队列测试

代码架构图

 Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

目录结构

配置文件信息

spring.rabbitmq.host=192.168.193.147  #IP地址
spring.rabbitmq.port=5672    #端口号
spring.rabbitmq.username=admin    #账号
spring.rabbitmq.password=123    #密码

SwaggerConfig 类 

@Configuration
@EnableSwagger2
public class SwaggerConfig 
    @Bean
    public Docket webApiConfig() 
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    

    private ApiInfo webApiInfo() 
        return new ApiInfoBuilder()
                .title("rabbitmq接口文档")
                .description("本文档描述了rabbitmq微服务接口定义")
                .version("1.0")
                .contact(new Contact("enjoy6288", "www.couraki.com", "123456@qq.com"))
                .build();
    

TtlQueueConfig类

导入包的时候很有可能导入错误,所以把包名也一同写出。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/*
 * TTL队列 配置文件类代码
 * */
@Configuration
public class TtlQueueConfig 
    
    //普通交换机的名称
    public static final String X_EXCHANGE = "X";
    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列的名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列的名称
    public static final String DEAD_LATTER_QUEUE = "QD";

    //声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() 
        return new DirectExchange(X_EXCHANGE);
    

    //声明yExchange
    @Bean("yExchange")
    public DirectExchange yExchange() 
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    

    //声明队列
    @Bean("queueA")
    public Queue queueA() 
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置死信Routing-key
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    

    //声明普通队列 TTL为30s
    @Bean("queueB")
    public Queue queueB() 
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置死信Routing-key
        arguments.put("x-dead-letter-routing-key", "YD");
        //设置TTL 单位是ms
        arguments.put("x-message-ttl", 30000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    

    //死信队列
    @Bean("queueD")
    public Queue queueD() 
        return QueueBuilder.durable(DEAD_LATTER_QUEUE).build();
    

    //绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) 
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    

    //绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) 
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    

    //绑定
    @Bean
    public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange) 
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    

生产者

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/message")
    public void sendMsg(@PathVariable String message) 
        log.info("当前时间:,发送一条信息给两个TTL队列:", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s的队列:" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为30s的队列:" + message);

    

消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer 

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception 
        String msg = new String(message.getBody());
        log.info("当前时间:,收到死信队列的消息:", new Date().toString(), msg);
    

测试结果

首先启动SpringBoot项目,浏览器输入localhost:8080/ttl/sendMsg/nihao

日志输出结果

延迟队列优化 

如上面的使用方式,如果我们有新的时间需求,那么就要新增一个队列,也就意味着代码并不通用。我们应当设计一个通用的队列。

代码架构图 

TtlQueueConfig 类新增内容

public static final String QUEUE_C = "QC";

//声明QC队列
@Bean("queueC")
public Queue queueC()
    Map<String, Object> arguments = new HashMap<>();
    //设置死信交换机
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    //设置死信RoutingKey
    arguments.put("x-dead-letter-routing-key","YD");
    return QueueBuilder.durable().withArguments(arguments).build();


@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange)
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");

生产者新增内容

//开始发消息
@GetMapping("sendExpirationMsg/message/ttlTime")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) 
    log.info("当前时间:,发送一条时长毫秒TTL信息给队列QC:",
            new Date().toString(), ttlTime, message);
    rabbitTemplate.convertAndSend("X", "XC", message, msg -> 
        //发送消息的时候 延迟时长
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    );

测试结果

localhost:8080/ttl/sendExpirationMsg/test1/20000

localhost:8080/ttl/sendExpirationMsg/test2/2000

两条信息先发送test1,延时20秒,然后发送test2,延时2秒。接收信息按照队列先后顺序接收。因为RabbitMQ智慧检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行!

插件实现延迟队列

安装插件

将插件拷贝到 rabbitMQ 插件文件夹中

cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/

进入此文件夹

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/

输入安装指令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启服务

systemctl restart rabbitmq-server.service

测试插件

代码架构图

目录结构 

生产者新增代码

//开始发消息 基于插件的消息及延迟的时间
@GetMapping("/sendDelayMsg/message/delayTime")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) 
    log.info("当前时间:,发送一条时长毫秒信息给延迟队列delayed.queue:",
            new Date().toString(), delayTime, message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME
            , DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> 
                // 发送消息的时候 延迟时长 单位ms
                msg.getMessageProperties().setDelay(delayTime);
                return msg;
            );

消费者 

@Slf4j
@Component
public class DeadLetterQueueConsumer 

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception 
        String msg = new String(message.getBody());
        log.info("当前时间:,收到死信队列的消息:", new Date().toString(), msg);
    

DelayedQueueConfig 类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig 

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    //声明队列
    @Bean
    public Queue delayedQueue() 
        return new Queue(DELAYED_QUEUE_NAME);
    
    //声明交换机
    @Bean
    public CustomExchange delayedExchange() 
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
                true, false, arguments);
    
    //绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange) 
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    

测试结果

localhost:8080/ttl/sendDelayMsg/test1/20000

localhost:8080/ttl/sendDelayMsg/test2/2000

先发送时长20秒的信息,然后在发送时长2秒的信息,先输出test2,再输出test1。 

以上是关于RabbitMQ消息队列笔记的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实战-死信队列

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ 死信队列

使用RabbitMQ处理死信队列