SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

Posted 猿仁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)相关的知识,希望对你有一定的参考价值。

目录

 

开始语

📝简述

🖋️代码实现-死信队列

application配置

Argument枚举

User对象

死信队列

生产者

消费者

结果验证

🖋️代码实现-延时队列

延迟队列

生产者

消费者

结果验证

🖋️代码实现-限流队列

限流队列

生产者

消费者-死信队列

结果验证

结束语


 

开始语

一位普通的程序员,慢慢在努力变强!

RabbitMQ部署方式(第一节)

SpringBoot集成RabbitMQ(第二节)

SpringBoot集成RabbitMQ之ACK确认机制(第三节)

📝简述

死信队列(Dead-Letter-Exchange)可以理解为是消息处理失败的接收者,其实也是一个普通的队列,当消息处理失败了,消息过期了,消息达到最大数量,消息被拒绝等...一系列的问题都可以由死信队列来接收,来处理。

死信队列的设置方法就是设置参数(x-dead-letter-exchange)来对死信队列进行一个捆绑,而捆绑的这个队列称为【死信队列】其实就是一个交换机路由队列,被捆绑死信的队列会有一个标识【DLX】,和普通的队列交换机没啥区别,就是设置了一个属性(Argument),当符合商务消息处理失败的一些条件时,就由死信队列来接收,想要实现死信队列,只需要在定义队列的时候加上一个参数即可:

 

 从上述图片可以看出,我们定义的死信队列【letter_queue】其实也就是一个交换机-普通队列,队列【error_queue】将被标识被DLX,当消费者消费异常时,那么就有死信死信队列【letter_queue】进行消费消息。

简单来讲,就是消费者最终消费失败的时候,由死信队列来消费次消息

🖋️代码实现-死信队列

application配置

spring:
  application:
    name: rabbitmq-deadLetter
  rabbitmq:
    host: tianyu.com.cn
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 20000
    listener:
      simple:
        prefetch: 50 # 队列限流50,其余消息给别的消费者消费
        max-concurrency: 50
        concurrency: 1
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3 # 默认是3,是一共三次,而不是重试三次,三次包含了第一执行,所以只重试了两次
          initial-interval: 2000 # 重试间隔时间。毫秒
  profiles:
    active: dev
server:
  port: 9090

Argument枚举

/**
 * 队列参数枚举
 *
 * @author tianyu.Ge
 * @date 2023/2/1 15:50
 */
public enum ArgumentEnum 
    X_MESSAGE_TTL("x-message-ttl", "发送到队列的消息在丢弃之前可以存活多长时间(毫秒)"),
    X_EXPIRES("x-expires", "队列在被自动删除(毫秒)之前可以使用多长时间"),
    X_MAX_LENGTH("x-max-length", "队列在开始从头部删除之前可以包含多少就绪消息"),
    X_MAX_LENGTH_BYTES("x-max-length-bytes", "队列在开始从头部删除之前可以包含的就绪消息的总体大小"),
    X_DEAD_LETTER_EXCHANGE("x-dead-letter-exchange", "设置队列溢出行为,消息被拒绝或过期,将重新发布这些名称(死信队列)"),
    X_DEAD_LETTER_ROUTING_KEY("x-dead-letter-routing-key", "(死信队列)-key"),
    X_MAX_PRIORITY("x-max-priority", "队列支持的最大优先级数;如果未设置,队列将不支持消息优先级,数字越大越优先"),
    X_QUEUE_MODE("x-queue-mode", "default[默认:内存] 和 lazy[存硬盘] 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息"),
    X_QUEUE_MASTER_LOCATOR("x-queue-master-locator", "将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则");

    public static final String X_MESSAGE_TTL_CONSTANT = "x-message-ttl";
    public static final String X_EXPIRES_CONSTANT = "x-expires";
    public static final String X_MAX_LENGTH_CONSTANT = "x-max-length";
    public static final String X_MAX_LENGTH_BYTES_CONSTANT = "x-max-length-bytes";
    public static final String X_DEAD_LETTER_EXCHANGE_CONSTANT = "x-dead-letter-exchange";
    public static final String X_DEAD_LETTER_ROUTING_KEY_CONSTANT = "x-dead-letter-routing-key";
    public static final String X_MAX_PRIORITY_CONSTANT = "x-max-priority";
    public static final String X_QUEUE_MODE_CONSTANT = "x-queue-mode";
    public static final String X_QUEUE_MASTER_LOCATOR_CONSTANT = "x-queue-master-locator";
    public String name;
    public String mess;

    private ArgumentEnum(final String name, final String mess) 
        this.name = name;
        this.mess = mess;
    

User对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @author 猿仁
 * @date 2023-01-31 09:38
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable 

    private String name;

死信队列

import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 死信队列
 *
 * @author tianyu.Ge
 * @date 2023/2/1 11:47
 */
@Component
@Slf4j
public class DeadLetterExchangeCustomer 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "letter_queue_123"),
            exchange = @Exchange("letter.exchange"),
            key = "letter"))
    public void letterQueue123(User data, Channel channel, Message message) throws IOException 
        log.info("[letterQueue123]死信队列开始消费异常通信>>>>>>>>>>>>>>>>>>>> ", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

生产者

/**
 * 第四种模式:路由模式-死信队列
 */
@Test
public void routeDeadLetterTest() throws InterruptedException 
    User user = new User("info1你好!");
    rabbitTemplate.convertAndSend("route_direct", "info1", user);
    // error消费出现异常,会造成死循环,
    // 解决次问题:1.直接吃掉异常,不要抛出,记录日志或者存库,2.开启消费者重试模式,限定次数
    user = new User("error1你好!");
    rabbitTemplate.convertAndSend("route_direct", "error1", user);

    Thread.sleep(5000);
    System.out.println("消费异常数据列表:"+RouteDeadLetterCustomer.errorCountDetail);
    System.out.println("消费次数:"+RouteDeadLetterCustomer.errorCount);

消费者

import com.net.model.entity.User;
import com.net.model.enums.ArgumentEnum;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * 第四种模型:路由模式
 * 死信队列之:重试次数达到最大次数,死信开始消费
 */
@Component
@Slf4j
public class RouteDeadLetterCustomer 

    public static final String INFO = "info1";
    public static final String ERROR = "error1";
    public static Map<Integer, String> errorCountDetail = new HashMap<>();
    public static int errorCount = 0;

    /**
     * 死信队列使用:重试除数达到最大
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = RouteDeadLetterCustomer.ERROR + "_queue"
            , arguments = @Argument(name = ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, value = "letter.exchange"), // 死信交换机
            @Argument(name = ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, value = "letter") // 绑定死信队列
    ),//绑定临时队列,可以给名称,那么就不是临时队列
            exchange = @Exchange(value = "route_direct", type = ExchangeTypes.DIRECT),//绑定交换机名称和类型(默认是direct类类型)
            key = RouteDeadLetterCustomer.ERROR))
    public void error(User user, Channel channel, Message message, @Headers Map<String, Object> headers) throws IOException 
        log.info("交换机模式【direct】,key【error】,开始消费:,Channel:, Message:, Headers :", user, channel, message, headers);
        // 第二种限定重试次数为:3,抛出异常
        try 
            ++errorCount;
            int a = 0 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
         catch (Exception e) 
            System.out.println(e.getMessage());
            // 不抛出异常,吃掉异常,记录日志
            log.error("交换机模式【direct】,key【error】,消费异常:,消息体为:", e.getMessage(), user);
            errorCountDetail.put(errorCount, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",user=" + user);
            if (errorCount == 3) 
                log.info("[error]消费者消费次数已上限,当前次数为:次,接下来由死信队列消费:exchange=letter.exchange key=letter", errorCount);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
             else 
                throw e;
            
        
    

    @RabbitListener(bindings = @QueueBinding(value = @Queue,//绑定临时队列,可以给名称,那么就不是临时队列
            exchange = @Exchange(value = "route_direct"),//绑定交换机名称和类型(默认是direct类类型)
            key = RouteDeadLetterCustomer.INFO))
    public void info(@Payload User user, Channel channel, Message message, @Headers Map<String, Object> headers) throws IOException 
        log.info("交换机模式【direct】,key【info】,开始消费:", user);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

结果验证

结果分析:

从结果上来看,消息info1是消费成功的,执行一次,而消费者error1是消费失败的,一共执行了三次,最终达到最大消费次数,从而由死信队列来进行消费! 消费的时间间隔也是设置的2秒一次。

由死信队列进行处理了,就不会造成消息丢失而找不到的情况,在死信队列中就可以做发送短信通知,记录消息异常让对应的开发进行处理!

 

🖋️代码实现-延时队列

延迟队列

import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 死信队列-延迟队列的使用
 *
 * @author tianyu.Ge
 * @date 2023/2/1 11:47
 */
@Component
@Slf4j
public class TTLExchangeCustomer 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "letter_ttl_queue"),
            exchange = @Exchange("letter.ttl.exchange"),
            key = "letter.ttl"))
    public void letterTtl(User data, Channel channel, Message message) throws IOException 
        log.info("[letterTtl]死信队列开始消费异常通信>>>>>>>>>>>>>>>>>>>> ", data);
        log.info("[letterTtl]死信队列完成时间:", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    


生产者

/**
 * 第四种模式:路由模式-死信队列-延迟队列
 */
@Test
public void routeTTLTest() throws InterruptedException 
    User user = new User("TTL 你好!");
    System.out.println("[routeTTLTest]发送时间时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    rabbitTemplate.convertAndSend("route_direct", "ttl.key", user);

    Thread.sleep(5000);

消费者

import com.net.model.enums.ArgumentEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import java.util.HashMap;
import java.util.Map;

/**
 * 延迟队列
 */
@Configuration
//@Component
@Slf4j
public class RouteTTLCustomer 


    /**
     * 延迟队列
     */
    @Bean
    public Queue createQueue() 
        Map<String, Object> args = new HashMap<>(3);
        // 设置消息存活时间
        long expirationTime = 3000;
        args.put(ArgumentEnum.X_MESSAGE_TTL_CONSTANT, expirationTime);
        // 设置死信交换机
        args.put(ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, "letter.ttl.exchange");
        // 设置死信key
        args.put(ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, "letter.ttl");
        log.info("ttl_queue队列的存活时间为:ms",expirationTime);
        return new Queue("ttl_queue", true, false, false, args);
    

    /**
     * 创建交换机
     */
    @Bean
    public DirectExchange createDirectExchange() 
        return new DirectExchange("route_direct", true, false);
    

    /**
     * 队列绑定交换机
     */
    @Bean
    public Binding createDirectBinding() 
        return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("ttl.key");
    

结果验证

结果分析:

通过设置队列的消息过期时间,在队列中消息存活超过3秒中,将会由死信队列进行消费。

 

🖋️代码实现-限流队列

限流队列

import com.net.model.enums.ArgumentEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


/**
 * 死信队列-限流队列
 */
//@Component
@Configuration
@Slf4j
public class RouteCurrentLimitingCustomer 



    /**
     * 代码块2--------------------------------------------------------------
     * 秒杀场景
     */
    /**
     * 创建队列
     */
    @Bean
    public Queue createQueue1() 
        Map<String, Object> args = new HashMap<>(3);
        // 设置消息存活时间
        long expirationTime = 5000;
        args.put(ArgumentEnum.X_MESSAGE_TTL_CONSTANT, expirationTime);
        // 设置最大容量
        long maxLength = 10;
        args.put(ArgumentEnum.X_MAX_LENGTH_CONSTANT, maxLength);
        // 设置死信交换机
        args.put(ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, "letter.exchange");
        // 设置死信key
        args.put(ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, "max.length.letter");
        log.info("currentLimiting_queue队列的最大容量为:个!",maxLength);
        return new Queue("currentLimiting_queue", true, false, false, args);
    

    /**
     * 创建交换机
     */
    @Bean
    public DirectExchange createDirectExchange1() 
        return new DirectExchange("route_direct", true, false);
    

    /**
     * 队列绑定交换机
     */
    @Bean
    public Binding createDirectBinding1() 
        return BindingBuilder.bind(createQueue1()).to(createDirectExchange1()).with("currentLimiting");
    

生产者

/**
 * 第四种模式:路由模式-死信队列-限流队列
 */
@Test
public void currentLimitingTest() throws InterruptedException 
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 15, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), new ThreadPoolExecutor.CallerRunsPolicy());
    for (int i = 1; i <= 1000000; i++) 
        final int a = i;
        threadPoolExecutor.execute(() -> 
            rabbitTemplate.convertAndSend("route_direct", "arguments", "张三" + a);
        );
    

    Thread.sleep(400000);
    System.out.println("此次抢购礼品人数:"+ CurrentLimitingExchangeCustomer.number);
    System.out.println("原子类此次抢购礼品人数:"+ CurrentLimitingExchangeCustomer.count.get());
    System.out.println("抢到礼品人数列表:"+ CurrentLimitingExchangeCustomer.list);
    System.out.println("抢到礼品人数:"+ CurrentLimitingExchangeCustomer.yesRobCount);
    System.out.println("未抢到礼品人数:"+ CurrentLimitingExchangeCustomer.noRobCount);

消费者-死信队列

import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 死信队列
 *
 * @author tianyu.Ge
 * @date 2023/2/1 11:47
 */
@Component
@Slf4j
public class CurrentLimitingExchangeCustomer 

    // 抢到礼品人数
    public static List<User> list = new ArrayList<>();
    // 抢购人数
    public static int number = 0;
    // 抢购人数原子类
    public static volatile AtomicInteger count = new AtomicInteger(0);
    // 没抢到礼品人数
    public static volatile AtomicInteger noRobCount = new AtomicInteger(0);
    // 抢到礼品人数
    public static volatile AtomicInteger yesRobCount = new AtomicInteger(0);

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "letter_queue"),
            exchange = @Exchange("letter.exchange"),
            key = "max.length.letter"))
    public void maxLengthLetter(User user, Channel channel, Message message) throws IOException 
        writeCompute(user, channel, message, "maxLengthLetter");
    

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "letter_queue"),
            exchange = @Exchange("letter.exchange"),
            key = "max.length.letter"))
    public void maxLengthLetter2(User user, Channel channel, Message message) throws IOException 
        writeCompute(user, channel, message, "maxLengthLetter2");
    

    /**
     * 防止并发修改问题
     *
     * @param user
     * @param channel
     * @param message
     * @param name
     * @return void
     * @author tianyu.Ge
     * @date 2023/2/6 17:00
     */
    public synchronized void writeCompute(User user, Channel channel, Message message, String name) throws IOException 
        number += 1;
        count.incrementAndGet();
        if (list.size() == 10) 
            noRobCount.incrementAndGet();
            log.error("活动结束,无礼品! 同学不要灰心,下次肯定是你!", user.getName());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            return;
        
        yesRobCount.incrementAndGet();
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        list.add(user);
        log.info("[]恭喜  同学抢到礼品>>>>>>>>>>>>>>>>>>>> ", name, user);
    

结果验证

结果分析:

一共跑了100万数据,礼品数限制在10个,所以只会有10个同学抢到,其他的同学将被淘汰,注意点是并发问题,虽然AtomicInteger是原子类操作,但是线程是很快的,当增加到第10个同学的时候。第11个已经在第9个的时候就进入了余量抢购代码区,这个时候是统计数量就存在问题,那么就变成了11,就造成并发问题。

解决方案是,让逻辑代码自增在同一个方法去,并且保证方法是安全的,一次只能处理一个请求。 

 

结束语

温馨提示:如有问题,可在下方留言,作者看到了会第一时间回复!

本章节完成了,各位正在努力的程序员们,如果你们觉得本文章对您有用的话,你学到了一些东西,希望猿友们点个赞+关注,支持一下猿仁!
持续更新中…

 

以上是关于SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)的主要内容,如果未能解决你的问题,请参考以下文章

Rabbitmq之整合Springboot,普通队列以及死信队列demo实例,队列优化以及插件实现延迟队列

RabbitMQ—SpringBoot中实现死信队列

SpringBoot整合RabbitMQ实现死信队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列