SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列死信备份交换机
Posted 土味儿~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列死信备份交换机相关的知识,希望对你有一定的参考价值。
一、应用场景
- 业务中心根据业务需求向特定用户发送消息;发送前不确定由哪个用户接收
- 特定用户接收特定消息;用户可以退出,再切换别的用户登录,用户登录后只接收与自已对应的消息
二、总体要求
项目要足够稳健,消息不能丢失
-
交换机、队列、消息持久化
-
队列有容量限制;如:3000
-
消息发送后需要确认(非自动确认)
-
未发送成功的消息,由缓存保存,定时重发
-
交换机收到消息,但无法投递时,转发至备份交换机,再广播至对应队列
-
费时操作采用异步方式
三、架构图
四、安装RabbitMQ
参考如下三篇文章
五、搭建SpringBoot项目
java1.8
spring-boot 2.6.7
1、依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.tuwer</groupId>
<artifactId>mq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- amqp-client -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- amqp-client Java原生依赖 -->
<!-- <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>-->
<!-- hutool-all -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.2</version>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.3</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2、application.yml
spring:
rabbitmq:
host: 192.168.3.174
port: 5672
username: admin
password: admin
virtual-host: /
# 交换机接收确认
publisher-confirm-type: correlated
# 交换机回退消息
#publisher-returns: true
2、启动类
@EnableAsync
开启异步操作
package com.tuwer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* @author 土味儿
* Date 2023/1/4
* @version 1.0
*/
@EnableAsync
@SpringBootApplication
public class MqApp
public static void main(String[] args)
SpringApplication.run(MqApp.class, args);
3、基础类
3.1、常量类
package com.tuwer.constant;
/**
* <p>系统常量类</p>
*
* @author 土味儿
* Date 2023/1/4
* @version 1.0
*/
public class Constants
/**
* 队列容量、通道预取值
* 队列容量应根据项目需要,设置合适的值;
* 本案例中为了测试方便设为5
*/
public static final int QUEUE_CAPACITY = 5;
public static final int PRE_FETCH_SIZE = 10;
/**
* 交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String BACKUP_EXCHANGE = "backup_exchange";
/**
* 队列
*/
public static final String BACKUP_QUEUE = "backup_queue";
3.2、雪花算法工具类
获取Long型id:
SnowflakeUtil.getInstance().nextId()
package com.tuwer.util;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
/**
* <p>雪花算法工具类</p>
*
* @author 土味儿
* Date 2022/6/2
* @version 1.0
*/
@Slf4j
@SuppressWarnings("all")
public class SnowflakeUtil
// ==============================Fields===========================================
/**
* 开始时间戳 (2000-01-01 00:00:00)
*/
private static final long TWEPOCH = 946656000000L;
/**
* 机器id所占的位数 5
*/
private static final long WORKER_ID_BITS = 5L;
/**
* 数据标识id所占的位数 5
*/
private static final long DATA_CENTER_ID_BITS = 5L;
/**
* 支持的最大机器id,结果是 31
*/
private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
/**
* 支持的最大数据标识id,结果是 31
*/
private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);
/**
* 序列在id中占的位数
*/
private static final long SEQUENCE_BITS = 12L;
/**
* 机器ID向左移12位
*/
private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
/**
* 数据标识id向左移17位(12+5)
*/
private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
/**
* 时间戳向左移22位(5+5+12)
*/
private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;
/**
* 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
*/
private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);
/**
* 步长 1024
*/
private static final long STEP_SIZE = 1024;
/**
* unsigned int max value
*/
private static final long UINT_MAX_VALUE = 0xffffffffL;
/**
* 工作机器ID(0~31)
*/
private long workerId;
/**
* 工作机器ID 计数器
*/
private long workerIdFlags = 0L;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 数据中心ID 计数器
*/
private long dataCenterIdFlags = 0L;
/**
* 毫秒内序列(0~4095)
*/
private long sequence = 0L;
/**
* 毫秒内序列基数[0|1024|2048|3072]
*/
private long basicSequence = 0L;
/**
* 上次生成ID的时间戳
*/
private long lastTimestamp = -1L;
/**
* 工作模式
*/
private final WorkMode workMode;
public enum WorkMode NON_SHARED, RATE_1024, RATE_4096;
//==============================单例模式(静态内部类)=====================================
private static class InnerClass
private static final SnowflakeUtil INNER_DEMO = new SnowflakeUtil();
public static SnowflakeUtil getInstance()
return InnerClass.INNER_DEMO;
//==============================Constructors=====================================
public SnowflakeUtil()
this(0, 0, WorkMode.RATE_4096);
/**
* 构造函数
*
* @param workerId 工作ID (0~31)
* @param dataCenterId 数据中心ID (0~31)
*/
public SnowflakeUtil(long workerId, long dataCenterId)
this(workerId, dataCenterId, WorkMode.RATE_4096);
/**
* 构造函数
*
* @param workerId 工作ID (0~31)
* @param dataCenterId 数据中心ID (0~31)
* @param workMode 工作模式
*/
public SnowflakeUtil(long workerId, long dataCenterId, WorkMode workMode)
this.workMode = workMode;
if (workerId > MAX_WORKER_ID || workerId < 0)
throw new IllegalArgumentException(MessageFormat.format("worker Id can't be greater than 0 or less than 0", MAX_WORKER_ID));
if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0)
throw new IllegalArgumentException(MessageFormat.format("datacenter Id can't be greater than 0 or less than 0", MAX_DATA_CENTER_ID));
this.workerId = workerId;
this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
this.dataCenterId = dataCenterId;
this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
// ==============================Methods==========================================
/**
* 获取机器id
*
* @return 所属机器的id
*/
public long getWorkerId()
return workerId;
/**
* 获取数据中心id
*
* @return 所属数据中心id
*/
public long getDataCenterId()
return dataCenterId;
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
public synchronized long nextId()
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < this.lastTimestamp)
if (timestamp > TWEPOCH)
if (WorkMode.NON_SHARED == this.workMode)
nonSharedClockBackwards(timestamp);
else if (WorkMode.RATE_1024 == this.workMode)
rate1024ClockBackwards(timestamp);
else
throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
else
throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
//如果是同一时间生成的,则进行毫秒内序列
if (this.lastTimestamp == timestamp)
this.sequence = (this.sequence + 1) & SEQUENCE_MASK;
//毫秒内序列溢出
if (this.sequence == 0)
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(this.lastTimestamp);
//时间戳改变,毫秒内序列重置
else
this.sequence = this.basicSequence;
//上次生成ID的时间戳
this.lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT)
| (this.dataCenterId << DATA_CENTER_ID_SHIFT)
| (this.workerId << WORKER_ID_SHIFT)
| this.sequence;
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间戳
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp)
long timestamp0;
do
timestamp0 = timeGen();
while (timestamp0 <= lastTimestamp);
return timestamp0;
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen()
return System.currentTimeMillis();
/**
* 尝试解决时钟回拨<br>【* 仅用于 单机生成不对外 的情况 *】
*
* @param timestamp 当前时间戳
* @return void
*/
private void nonSharedClockBackwards(long timestamp)
if (this.dataCenterIdFlags >= UINT_MAX_VALUE && this.workerIdFlags >= UINT_MAX_VALUE)
throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
else
//如果仅用于生成不重复的数值,尝试变更 dataCenterId 或 workerId 修复时钟回拨问题
log.warn("Clock moved backwards. Refusing to generate id for milliseconds", lastTimestamp - timestamp);
//先尝试变更 dataCenterId,当 dataCenterId 轮询一遍之后,尝试变更 workerId 并重置 dataCenterId
if (this.dataCenterIdFlags >= UINT_MAX_VALUE)
if (++this.workerId > MAX_WORKER_ID)
this.workerId = 0L;
this.workerIdFlags = setSpecifiedBitTo1<以上是关于SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列死信备份交换机的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合RabbitMQ之典型应用场景实战三
SpringBoot整合RabbitMQ之典型应用场景实战一