MQ4万字保姆教程|RabbitMQ知识点整理与Springboot整合附Demo(图文并茂)
Posted BugGuys
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ4万字保姆教程|RabbitMQ知识点整理与Springboot整合附Demo(图文并茂)相关的知识,希望对你有一定的参考价值。
导读
【4万字保姆级教程】本文详细的从应用层面上讲解了RabbitMQ的使用以及整合Springboot;对于其概念进行讲解,提供了可以完成日常开发的接口与demo;
工作队列
1. work Queues
工作队列(又称任务队列)的主要目的是避免立即执行资源密集型任务,且等待执行完成。我们可以将任务放入队列中,后台运行的工作进程将任务取出并执行,当有多个工作线程时,这些线程将一起处理任务。
轮询分发消息
一个生产者发送到一个队列中,且由多个工作线程去处理。一个消息只能被处理一次,多个工作线程是竞争的关系。
一个生产者: 发送10条消息
public class Task {
public static final String Queue_Name="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
for (int i = 0; i < 10; i++) {
channel.basicPublish("", Queue_Name, null, (message+"_"+i).getBytes());
}
System.out.println("消息发送完毕");
}
}
启动3个消费者
public class Worker {
public static final String Queue_Name = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/*
参数1:消费哪个队列
参数2:消费成功后是否要自动应答 true自动应答,false手动应答
参数3:消息成功消费的回调
参数4:消息消费异常的回调
*/
channel.basicConsume(Queue_Name, true, (consumerTag, message)-> {
System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
},(consumerTag)->{
System.out.println(args[0]+"消息被消费中断:"+consumerTag);
});
}
}
执行结果(轮询非有序)
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_0
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_2
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_1
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_5
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_3
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_8
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_4
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_6
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_7
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_9
2. 消息应答
概念
若工作线程突发异常中断,那么我们可能将丢失正在处理的消息。因此mq引入了一种消息应答机制,保证消费者在处理消息后,告诉MQ已经处理,可以将消息删除。
自动应答
消息发送后立即被认为已经发送成功。
需要在高吞吐量和数据传输安全性方面做权衡,仅适用在消费者可以高效以某种速率处理消息的情况下使用;
该模式可能因为消费者channel关闭造成消息丢失以及未对发送消息数量进行限制导致消息发送过载等风险;
手动应答
推荐手动应答
Channel.basicAck(long tag, boolean multiple) // 肯定确认 mq确定消息成功处理,可以删除
Channel.basicNack(long tag, boolean multiple, boolean requeue) // 不确定 mq不确定消息是否处理
Channel.basicReject(long tag, boolean requeue) // 不处理消息,直接拒绝,直接丢弃,不能批量处理
tag:消息标识;message.getEnvelope().getDeliveryTag();
multiple的true和false,见下面的代码
true:批量应答Channel上未应答的消息,当channel上传tag送的消息是1,2,3,4,当tag=4被确认时,1-3也会被确认收到消息应答;
false:只会应答当前消息,其余消息不会被应答;
requeue:true,拒绝的消息重新入消费队列;
消息自动重新入队
如果消费者异常断开,导致消息未发送ACK确认,MQ将其消息重新排队,很快发送给另一个消费者,保证消息的不丢失;
// 手动确认代码
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
/*
应答信道消息
参数2:是否批量确认
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println(args[0]+"消息被消费中断:"+consumerTag);
});
3. RabbitMQ持久化
默认情况下,RabbitMQ因异常导致未成功消费的消息丢弃,若需要持久化需要将队列和消息都标记为持久化;
设置队列持久化
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, true, false, false, null);
注意:若原先声明的队列不是持久化的,启动会报错,需要将原先队列删除后重新创建持久化队列。
设置消息持久化
// 第三个参数:MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
将消息持久化并不能完全保证消息不丢失,因为消息当准备存储在磁盘的时候还未完全写入,消息还在缓存时的一个间隔点,此时并没有真正写入磁盘,持久性无法完全保证,需要参考后续的确认。
不公平分发
RabbitMQ默认情况是使用轮询消费消息,但如果多个消费者的处理速度不一致,就会导致慢的消费者影响到了快的消费者执行,因此需要有一种能者多劳的分发方式。
设置参数:默认是0,表示轮询分发;作用于手动确认的消费者;
设置prefetchCount = 3。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理3个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它;
int prefetchCount = 1;// 预取值,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
channel.basicQos(prefetchCount);
代码:默认发送10条消息,用两个(一快一慢)消费者消费;
private static void createFasterConsumer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 默认0是轮询
channel.basicQos(1);// 预取值为1,表示预取值1个消息
// 手动应答1s
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println("快的tag:"+consumerTag+" message:"+new String(message.getBody()));
ThreadUtils.sleep(1);
/*
应答信道消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println("快的消息被消费中断:"+consumerTag);
});
}
private static void createSlowerConsumer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 默认0是轮询
channel.basicQos(1);// 预取值为1,表示预取值1个消息
// 手动应答10s
channel.basicConsume(Queue_Name, false, (consumerTag, message)-> {
System.out.println("慢的tag:"+consumerTag+" message:"+new String(message.getBody()));
ThreadUtils.sleep(10);
/*
应答信道消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},(consumerTag)->{
System.out.println("慢的消息被消费中断:"+consumerTag);
});
}
执行结果:证明快的消费者消费数量多
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_0
慢的tag:amq.ctag-pvROpH3ChDTW8gl0qnbU1A message:hello world_1
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_2
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_3
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_4
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_5
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_6
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_7
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_8
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_9
预取值
由于消息本身发送是异步的,消息的接收处理也是异步的。因此消费者就有一个未确认的消息缓冲区。避免消息堆积在缓冲区中,无法消费。我们可以通过channel.basicQos(N)
来设置未确认的消息缓冲区的大小。该值定义通道上允许的未确认消息的最大数量。一旦数量达到了配置的数量,MQ将停止继续向该channel发送消息,除非收到了消息确认。通常,增加预取值将提高向消费者传递消息的速度,虽然自动应答传输消息的速率是最佳的,但是,该情况下已传递但尚未处理的消息数量也会增加,从而增加了消费则的RAM消耗(随机存储器)。也就是消费者内存的消耗。
举例,同样上面的场景代码,默认发送10条消息,用两个(一快一慢)消费者消费;快的basicQos(1),慢的basicQos(6),可以观察到,慢的缓冲区确实有6条,快的消费了4条;
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_0
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_1
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_7
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_8
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_9
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_2
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_3
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_4
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_5
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_6
4. 发布确认
生产者将信道设置成confirm
模式,在该模式下,信道上的所有消息都会有一个唯一的UID(从1递增)。当消息投递到所有匹配的队列时,broker 就会发送包含UID的确认消息给生产者,让生产者知道消息正确到达目的队列了;
channel.confirmSelect();
若消息和队列是持久化的,那么确认消息会在写入磁盘后发出,回传确认消息的delivery-tag
域包含UID;
broker 也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
优势:
- confirm模式是异步的,一旦发布一条消息,生产者可以等待ack的同时发送下一条消息,当这条消息最终收到确认后,生产者可以通过回调方法来确认该消息;
- 若因为MQ的问题导致消息丢失,就会发送一条nack消息,生产者同样可以通过回调处理nack消息;
单个确认发布
是一种同步确认发布的方式,当一个消息发布后收到确认了才会继续发布后续消息。waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间内没有返回则会抛异常。
缺点:发布速度慢,当一条消息没有确认发布,就会阻塞后面的消息发布;
实验:使用单个确认发布,发布1000条消息 最终耗时 3350ms;
public static void publishMessageBySync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
boolean b = channel.waitForConfirms();
if (b){
System.out.println("发送成功");
} else {
System.out.println("-----发送失败------");
}
}
long end = System.currentTimeMillis();
System.out.println("消息发送完毕 "+ (end - start));
}
批量确认发布
与单个确认发布相比,批量确认发布可以极大的提高吞吐量;
缺点:当发生故障导致确认发布出现问题时,无法确定是哪一条消息出现了故障;
该方案仍是同步的,依旧会出现阻塞发布消息;
实验:生产者发送1000个消息,每100次确认一回,总共耗时62ms
public static void publishMessageByBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
/*
参数1:交换机名称,默认""
参数2:路由的Key值是哪个 本次是队列名称
参数3:其他参数
参数4:消息体
*/
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
if (i % 100 == 0) {
boolean b = channel.waitForConfirms();
if (b){
System.out.println("发送成功");
} else {
System.out.println("-----发送失败------");
}
}
}
long end = System.currentTimeMillis();
System.out.println("消息发送完毕 "+ (end - start));
}
异步确认发布
利用回调函数来达到消息的可靠传递。通过中间件也是通过函数回调来保证是否投递成功;他将消息放入一个容器中,每个消息都有一个UID,每次异步确认发布,可以保证消息是否成功确认发布;
通常,我们需要通过使用并发容器ConcurrentSkipListMap
存储所有发送的消息,然后每次确认后将容器中的对应消息删除,剩下的消息就是未确认的消息;
实验:生产者发送1000条消息,异步监听确认发布,耗时36ms。通过日志发现,每次发送deliveryTag都从1开始,并且multiple同时存在true和false;
public static void publishMessageByAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 生成一个队列
/*
String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
参数1:队列名
参数2:队列消息是否持久化,默认false存储在内存中
参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
参数5:其他参数
*/
channel.queueDeclare(Queue_Name, false, false, false, null);
// 发消息
String message = "hello world";
// 高并发下的hashMap
ConcurrentSkipListMap<Long, String> confirmMap = new ConcurrentSkipListMap<>();
// deliveryTag 消息编号,multiple是否批量确认
// 消息确认成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println(String.format("确认的消息 deliveryTag:%d multiple:%s", deliveryTag, multiple));
// 如果是批量删除需要使用headMap
if (multiple) {
ConcurrentNavigableMap<Long, 四万字32图,Kafka知识体系保姆级教程宝典
Flink保姆级教程,超全五万字,学习与面试收藏这一篇就够了