SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)
Posted 猿仁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)相关的知识,希望对你有一定的参考价值。
目录
开始语
一位普通的程序员,慢慢在努力变强!
SpringBoot集成RabbitMQ之ACK确认机制(第三节)
📝简述
死信队列(Dead-Letter-Exchange)可以理解为是消息处理失败的接收者,其实也是一个普通的队列,当消息处理失败了,消息过期了,消息达到最大数量,消息被拒绝等...一系列的问题都可以由死信队列来接收,来处理。
死信队列的设置方法就是设置参数(x-dead-letter-exchange)来对死信队列进行一个捆绑,而捆绑的这个队列称为【死信队列】其实就是一个交换机路由队列,被捆绑死信的队列会有一个标识【DLX】,和普通的队列交换机没啥区别,就是设置了一个属性(Argument),当符合商务消息处理失败的一些条件时,就由死信队列来接收,想要实现死信队列,只需要在定义队列的时候加上一个参数即可:
从上述图片可以看出,我们定义的死信队列【letter_queue】其实也就是一个交换机-普通队列,队列【error_queue】将被标识被DLX,当消费者消费异常时,那么就有死信死信队列【letter_queue】进行消费消息。
简单来讲,就是消费者最终消费失败的时候,由死信队列来消费次消息
🖋️代码实现-死信队列
application配置
spring:
application:
name: rabbitmq-deadLetter
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 20000
listener:
simple:
prefetch: 50 # 队列限流50,其余消息给别的消费者消费
max-concurrency: 50
concurrency: 1
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3 # 默认是3,是一共三次,而不是重试三次,三次包含了第一执行,所以只重试了两次
initial-interval: 2000 # 重试间隔时间。毫秒
profiles:
active: dev
server:
port: 9090
Argument枚举
/**
* 队列参数枚举
*
* @author tianyu.Ge
* @date 2023/2/1 15:50
*/
public enum ArgumentEnum
X_MESSAGE_TTL("x-message-ttl", "发送到队列的消息在丢弃之前可以存活多长时间(毫秒)"),
X_EXPIRES("x-expires", "队列在被自动删除(毫秒)之前可以使用多长时间"),
X_MAX_LENGTH("x-max-length", "队列在开始从头部删除之前可以包含多少就绪消息"),
X_MAX_LENGTH_BYTES("x-max-length-bytes", "队列在开始从头部删除之前可以包含的就绪消息的总体大小"),
X_DEAD_LETTER_EXCHANGE("x-dead-letter-exchange", "设置队列溢出行为,消息被拒绝或过期,将重新发布这些名称(死信队列)"),
X_DEAD_LETTER_ROUTING_KEY("x-dead-letter-routing-key", "(死信队列)-key"),
X_MAX_PRIORITY("x-max-priority", "队列支持的最大优先级数;如果未设置,队列将不支持消息优先级,数字越大越优先"),
X_QUEUE_MODE("x-queue-mode", "default[默认:内存] 和 lazy[存硬盘] 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息"),
X_QUEUE_MASTER_LOCATOR("x-queue-master-locator", "将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则");
public static final String X_MESSAGE_TTL_CONSTANT = "x-message-ttl";
public static final String X_EXPIRES_CONSTANT = "x-expires";
public static final String X_MAX_LENGTH_CONSTANT = "x-max-length";
public static final String X_MAX_LENGTH_BYTES_CONSTANT = "x-max-length-bytes";
public static final String X_DEAD_LETTER_EXCHANGE_CONSTANT = "x-dead-letter-exchange";
public static final String X_DEAD_LETTER_ROUTING_KEY_CONSTANT = "x-dead-letter-routing-key";
public static final String X_MAX_PRIORITY_CONSTANT = "x-max-priority";
public static final String X_QUEUE_MODE_CONSTANT = "x-queue-mode";
public static final String X_QUEUE_MASTER_LOCATOR_CONSTANT = "x-queue-master-locator";
public String name;
public String mess;
private ArgumentEnum(final String name, final String mess)
this.name = name;
this.mess = mess;
User对象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 猿仁
* @date 2023-01-31 09:38
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable
private String name;
死信队列
import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 死信队列
*
* @author tianyu.Ge
* @date 2023/2/1 11:47
*/
@Component
@Slf4j
public class DeadLetterExchangeCustomer
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "letter_queue_123"),
exchange = @Exchange("letter.exchange"),
key = "letter"))
public void letterQueue123(User data, Channel channel, Message message) throws IOException
log.info("[letterQueue123]死信队列开始消费异常通信>>>>>>>>>>>>>>>>>>>> ", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
生产者
/**
* 第四种模式:路由模式-死信队列
*/
@Test
public void routeDeadLetterTest() throws InterruptedException
User user = new User("info1你好!");
rabbitTemplate.convertAndSend("route_direct", "info1", user);
// error消费出现异常,会造成死循环,
// 解决次问题:1.直接吃掉异常,不要抛出,记录日志或者存库,2.开启消费者重试模式,限定次数
user = new User("error1你好!");
rabbitTemplate.convertAndSend("route_direct", "error1", user);
Thread.sleep(5000);
System.out.println("消费异常数据列表:"+RouteDeadLetterCustomer.errorCountDetail);
System.out.println("消费次数:"+RouteDeadLetterCustomer.errorCount);
消费者
import com.net.model.entity.User;
import com.net.model.enums.ArgumentEnum;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 第四种模型:路由模式
* 死信队列之:重试次数达到最大次数,死信开始消费
*/
@Component
@Slf4j
public class RouteDeadLetterCustomer
public static final String INFO = "info1";
public static final String ERROR = "error1";
public static Map<Integer, String> errorCountDetail = new HashMap<>();
public static int errorCount = 0;
/**
* 死信队列使用:重试除数达到最大
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = RouteDeadLetterCustomer.ERROR + "_queue"
, arguments = @Argument(name = ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, value = "letter.exchange"), // 死信交换机
@Argument(name = ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, value = "letter") // 绑定死信队列
),//绑定临时队列,可以给名称,那么就不是临时队列
exchange = @Exchange(value = "route_direct", type = ExchangeTypes.DIRECT),//绑定交换机名称和类型(默认是direct类类型)
key = RouteDeadLetterCustomer.ERROR))
public void error(User user, Channel channel, Message message, @Headers Map<String, Object> headers) throws IOException
log.info("交换机模式【direct】,key【error】,开始消费:,Channel:, Message:, Headers :", user, channel, message, headers);
// 第二种限定重试次数为:3,抛出异常
try
++errorCount;
int a = 0 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
catch (Exception e)
System.out.println(e.getMessage());
// 不抛出异常,吃掉异常,记录日志
log.error("交换机模式【direct】,key【error】,消费异常:,消息体为:", e.getMessage(), user);
errorCountDetail.put(errorCount, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",user=" + user);
if (errorCount == 3)
log.info("[error]消费者消费次数已上限,当前次数为:次,接下来由死信队列消费:exchange=letter.exchange key=letter", errorCount);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
else
throw e;
@RabbitListener(bindings = @QueueBinding(value = @Queue,//绑定临时队列,可以给名称,那么就不是临时队列
exchange = @Exchange(value = "route_direct"),//绑定交换机名称和类型(默认是direct类类型)
key = RouteDeadLetterCustomer.INFO))
public void info(@Payload User user, Channel channel, Message message, @Headers Map<String, Object> headers) throws IOException
log.info("交换机模式【direct】,key【info】,开始消费:", user);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
结果验证
结果分析:
从结果上来看,消息info1是消费成功的,执行一次,而消费者error1是消费失败的,一共执行了三次,最终达到最大消费次数,从而由死信队列来进行消费! 消费的时间间隔也是设置的2秒一次。
由死信队列进行处理了,就不会造成消息丢失而找不到的情况,在死信队列中就可以做发送短信通知,记录消息异常让对应的开发进行处理!
🖋️代码实现-延时队列
延迟队列
import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 死信队列-延迟队列的使用
*
* @author tianyu.Ge
* @date 2023/2/1 11:47
*/
@Component
@Slf4j
public class TTLExchangeCustomer
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "letter_ttl_queue"),
exchange = @Exchange("letter.ttl.exchange"),
key = "letter.ttl"))
public void letterTtl(User data, Channel channel, Message message) throws IOException
log.info("[letterTtl]死信队列开始消费异常通信>>>>>>>>>>>>>>>>>>>> ", data);
log.info("[letterTtl]死信队列完成时间:", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
生产者
/**
* 第四种模式:路由模式-死信队列-延迟队列
*/
@Test
public void routeTTLTest() throws InterruptedException
User user = new User("TTL 你好!");
System.out.println("[routeTTLTest]发送时间时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
rabbitTemplate.convertAndSend("route_direct", "ttl.key", user);
Thread.sleep(5000);
消费者
import com.net.model.enums.ArgumentEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 延迟队列
*/
@Configuration
//@Component
@Slf4j
public class RouteTTLCustomer
/**
* 延迟队列
*/
@Bean
public Queue createQueue()
Map<String, Object> args = new HashMap<>(3);
// 设置消息存活时间
long expirationTime = 3000;
args.put(ArgumentEnum.X_MESSAGE_TTL_CONSTANT, expirationTime);
// 设置死信交换机
args.put(ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, "letter.ttl.exchange");
// 设置死信key
args.put(ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, "letter.ttl");
log.info("ttl_queue队列的存活时间为:ms",expirationTime);
return new Queue("ttl_queue", true, false, false, args);
/**
* 创建交换机
*/
@Bean
public DirectExchange createDirectExchange()
return new DirectExchange("route_direct", true, false);
/**
* 队列绑定交换机
*/
@Bean
public Binding createDirectBinding()
return BindingBuilder.bind(createQueue()).to(createDirectExchange()).with("ttl.key");
结果验证
结果分析:
通过设置队列的消息过期时间,在队列中消息存活超过3秒中,将会由死信队列进行消费。
🖋️代码实现-限流队列
限流队列
import com.net.model.enums.ArgumentEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列-限流队列
*/
//@Component
@Configuration
@Slf4j
public class RouteCurrentLimitingCustomer
/**
* 代码块2--------------------------------------------------------------
* 秒杀场景
*/
/**
* 创建队列
*/
@Bean
public Queue createQueue1()
Map<String, Object> args = new HashMap<>(3);
// 设置消息存活时间
long expirationTime = 5000;
args.put(ArgumentEnum.X_MESSAGE_TTL_CONSTANT, expirationTime);
// 设置最大容量
long maxLength = 10;
args.put(ArgumentEnum.X_MAX_LENGTH_CONSTANT, maxLength);
// 设置死信交换机
args.put(ArgumentEnum.X_DEAD_LETTER_EXCHANGE_CONSTANT, "letter.exchange");
// 设置死信key
args.put(ArgumentEnum.X_DEAD_LETTER_ROUTING_KEY_CONSTANT, "max.length.letter");
log.info("currentLimiting_queue队列的最大容量为:个!",maxLength);
return new Queue("currentLimiting_queue", true, false, false, args);
/**
* 创建交换机
*/
@Bean
public DirectExchange createDirectExchange1()
return new DirectExchange("route_direct", true, false);
/**
* 队列绑定交换机
*/
@Bean
public Binding createDirectBinding1()
return BindingBuilder.bind(createQueue1()).to(createDirectExchange1()).with("currentLimiting");
生产者
/**
* 第四种模式:路由模式-死信队列-限流队列
*/
@Test
public void currentLimitingTest() throws InterruptedException
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 15, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 1000000; i++)
final int a = i;
threadPoolExecutor.execute(() ->
rabbitTemplate.convertAndSend("route_direct", "arguments", "张三" + a);
);
Thread.sleep(400000);
System.out.println("此次抢购礼品人数:"+ CurrentLimitingExchangeCustomer.number);
System.out.println("原子类此次抢购礼品人数:"+ CurrentLimitingExchangeCustomer.count.get());
System.out.println("抢到礼品人数列表:"+ CurrentLimitingExchangeCustomer.list);
System.out.println("抢到礼品人数:"+ CurrentLimitingExchangeCustomer.yesRobCount);
System.out.println("未抢到礼品人数:"+ CurrentLimitingExchangeCustomer.noRobCount);
消费者-死信队列
import com.net.model.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 死信队列
*
* @author tianyu.Ge
* @date 2023/2/1 11:47
*/
@Component
@Slf4j
public class CurrentLimitingExchangeCustomer
// 抢到礼品人数
public static List<User> list = new ArrayList<>();
// 抢购人数
public static int number = 0;
// 抢购人数原子类
public static volatile AtomicInteger count = new AtomicInteger(0);
// 没抢到礼品人数
public static volatile AtomicInteger noRobCount = new AtomicInteger(0);
// 抢到礼品人数
public static volatile AtomicInteger yesRobCount = new AtomicInteger(0);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "letter_queue"),
exchange = @Exchange("letter.exchange"),
key = "max.length.letter"))
public void maxLengthLetter(User user, Channel channel, Message message) throws IOException
writeCompute(user, channel, message, "maxLengthLetter");
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "letter_queue"),
exchange = @Exchange("letter.exchange"),
key = "max.length.letter"))
public void maxLengthLetter2(User user, Channel channel, Message message) throws IOException
writeCompute(user, channel, message, "maxLengthLetter2");
/**
* 防止并发修改问题
*
* @param user
* @param channel
* @param message
* @param name
* @return void
* @author tianyu.Ge
* @date 2023/2/6 17:00
*/
public synchronized void writeCompute(User user, Channel channel, Message message, String name) throws IOException
number += 1;
count.incrementAndGet();
if (list.size() == 10)
noRobCount.incrementAndGet();
log.error("活动结束,无礼品! 同学不要灰心,下次肯定是你!", user.getName());
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
return;
yesRobCount.incrementAndGet();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
list.add(user);
log.info("[]恭喜 同学抢到礼品>>>>>>>>>>>>>>>>>>>> ", name, user);
结果验证
结果分析:
一共跑了100万数据,礼品数限制在10个,所以只会有10个同学抢到,其他的同学将被淘汰,注意点是并发问题,虽然AtomicInteger是原子类操作,但是线程是很快的,当增加到第10个同学的时候。第11个已经在第9个的时候就进入了余量抢购代码区,这个时候是统计数量就存在问题,那么就变成了11,就造成并发问题。
解决方案是,让逻辑代码自增在同一个方法去,并且保证方法是安全的,一次只能处理一个请求。
结束语
温馨提示:如有问题,可在下方留言,作者看到了会第一时间回复!
本章节完成了,各位正在努力的程序员们,如果你们觉得本文章对您有用的话,你学到了一些东西,希望猿友们点个赞+关注,支持一下猿仁!
持续更新中…
以上是关于SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)的主要内容,如果未能解决你的问题,请参考以下文章