RabbitMQ 消息队列学习
Posted IT_Holmes
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 消息队列学习相关的知识,希望对你有一定的参考价值。
文章目录
- 1. 工作队列(Work Queues) 消息应答
- 2. RabbitMQ 持久化
- 3. 不公平分发 策略
- 4. 预去值 策略
- 5. 发布确认
- 6. 如何处理异步发布确认的 未确认消息
- 7. 三种确认发布 对比
1. 工作队列(Work Queues) 消息应答
1.1 什么是 消息应答机制?
rabbitmq一旦向消费者传递了一条信息,便立即将该消息标记为删除。这样如果有一个消费者处理了一个长的任务并且只完成了部分突然挂掉了,这种情况就很尴尬。
因此,为了保证消息在发送过程中不丢失,rabbitmq引入了消息应答机制。
消息应答就是:
消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
1.2 自动应答 和 手动应答
自动应答:就是消费者接收到消息后,自动告诉rabbitmq服务该消息已完成,实际上消费者也仅仅是接受到了消息,有可能还没有运行完成。
因此,自动应答比较适合环境不容易出错的情况下。
手动应答:可以设置方法,来让rabbitmq得知相关信息。
Channel.basicAck方法 //用于肯定确认,Ack英文直译应答信号的意思。
Channel.basicNack//用于否定确认,Nack英文直译否定应答的意思。
Channel.basicReject//也是用于否定确认,与上面区别如下:
/*
Channel.basicReject与Channel.basicNack相比较少了一个参数
该参数是Multiple批量处理。
也就是channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息。
*/
对Channel.basicNack中参数multiple,是针对批量应答的:
设置为true和false,对rabbitmq服务消息的不同效果:
- 设置为true,就是应答了第一个消息,则此信道channel当中的所有消息都应答了。
- 设置为false,就是仅仅只会应答对应的消息。建议设置为false。
1.3 消息 自动重新入队 过程原理
结合图理解消息自动重新入队原理:
- 图一:消息1给了C1,消息2给了C2。
- 图二:此时,消息1失去连接,C1未发送ack。而C2成功处理消息,并且返回ack。
- 图三:因为,C1失去连接并且没有发送ack,所以队列中的消息1不会被标记删除。而C2返回了ack,因此消息2就会被标记删除。此时,队列中就只有消息1了。
- 图四:之后,因为C1失去了连接,消息1就会被发送到C2进行处理。
1.4 消息 手动应答 代码测验
生产者代码:
package com.itholmes.rabbitmq.three;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 生产者代码
*
*/
public class Producer
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//发送消息
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMqUtils.getChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
//从控制台中输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String message = scanner.next();
//对于中文可以加上UTF-8,这样不会乱码
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生茶这发出消息:"+message);
消费者1代码:
package com.itholmes.rabbitmq.three;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接受消息处理(时间短)");
DeliverCallback deliverCallback = (var1,var2)->
//睡眠1秒
try
Thread.sleep(1000);//1秒
catch (InterruptedException e)
e.printStackTrace();
//同样这里可以设置为utf-8
System.out.println("接受到的消息:"+new String(var2.getBody(),"UTF-8"));
/**
* channel.basicAck()进行手动应答代码:
* 参数1:消息的标记 tag
* 参数2:是否批量应答
*/
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
;
CancelCallback cancelCallback = (var1)->
System.out.println(var1+"消费者取消消费接口回调逻辑。");
;
//采用手动应答
boolean autoAck = false;//设置为false,不自动也就是手动
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
消费者2代码:
package com.itholmes.rabbitmq.three;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接受消息处理(时间长)");
DeliverCallback deliverCallback = (var1,var2)->
//睡眠30秒,模拟当前消费者宕机后
try
Thread.sleep(30000);//30秒
catch (InterruptedException e)
e.printStackTrace();
//同样这里可以设置为utf-8
System.out.println("接受到的消息:"+new String(var2.getBody(),"UTF-8"));
/**
* channel.basicAck()进行手动应答代码:
* 参数1:消息的标记 tag
* 参数2:是否批量应答
*/
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);
;
CancelCallback cancelCallback = (var1)->
System.out.println(var1+"消费者取消消费接口回调逻辑。");
;
//采用手动应答
boolean autoAck = false;//设置为false,不自动也就是手动
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
通过启动producter,consumer1和consumer2。发送消息,rabbitmq会轮询依次发给两个消费者;此时我们终止consumer2,rabbitmq没有收到ack,并且检测到consumer2已经断开连接,就会重新将消息分配,也就是分配给了consumer1。
2. RabbitMQ 持久化
2.1 队列持久化
如何保障当Rabbitmq服务停掉或者由于某种原因崩溃后消息生产者已经发送过来的消息不丢失。
- 解决办法:可以将队列和消息都标记为持久化。
队列持久化:
- 在声明队列的时候,设置参数durable为true,表示让队列持久化。
//队列持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
有可能遇到如下错误问题:
- 需要注意的是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化队列,不然就会出现如下错误。
如何删除队列?
- 可以直接去rabbitmq web页面进行删除。
- 也可以通过 channel.queueDelete(QUEUE_NAME)进行删除。
队列持久化后,web页面就会有D的字样:
2.2 消息持久化
区分队列和消息,队列持久化了不代表队列里面的消息也持久化了。
消息的持久化也是需要配置参数。
因为队列持久化和消息持久化都是在生产者这边设置参数:
package com.itholmes.rabbitmq.three;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 生产者代码
*
*/
public class Producer
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//发送消息
public static void main(String[] args) throws IOException, TimeoutException
Channel channel = RabbitMqUtils.getChannel();
/**
* 队列持久化
*/
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String message = scanner.next();
/**
* 设置生产者发送消息为持久化消息(就是要求保存到磁盘上)
*/
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("生茶这发出消息:"+message);
因为,消息可能刚准备存储磁盘,还处于缓存的一个间隔点就可能宕机。因此这种情况持久化并不是保证的,更强有力的持久化策略是发布确认。
3. 不公平分发 策略
轮询分发策略,其实在大多数情况下并不是适用。就比如两个消费者,一个处理快一个处理慢,这样处理快的消费者就有大量时间处于空闲状态,效率就很慢。
为了避免上面这种情况,可以设置参数channel.basicQos(1);
- 参数设置为1,是不公平分发。
- 参数设置为0,是轮询分发(也是默认的)。
- 参数设置为大于1的,是对应的预去值。
要注意的是设置不公平分发,并不是在生产者设置,而是在消费者设置:
//设置不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);
不公平分发是经常使用的一种分发策略。
4. 预去值 策略
prefetch英文直译:预先载入。
预去值设置:
//设置大于1的预去值(0是轮询,1是不公平分发)
int prefetchCount = 5;
channel.basicQos(prefetchCount);
如果一口气发送了7条数据,首先两条进入C1,之后的5条进入C2。因为C2处理的非常慢,所以C2的信道channel会堆积多条消息。
5. 发布确认
5.1 发布确认 过程原理
想要保证消息安全持久化了,就必须要使用发布确认。
发布确认持久化的过程:
- 1.队列设置为持久化。
- 2.消息设置为持久化。
- 3.设置发布确认。
5.2 开启 发布确认
在生产者发消息之前,要开启发布确认:
- 默认是没有开启的。
Channel channel = RabbitMqUtils.getChannel();
/**
* 开启发布确认
*/
channel.confirmSelect();
确认发布有三种策略:
- 单个确认发布。
- 批量确认发布。
- 异步确认发布。
5.3 单个确认发布
单个确认发布:是一种简单,同步确认发布方式。
发布一个消息之后,只有它被确认发布后,后续的消息才能继续发布。
/**
* 单个消息就马上进行发布确认:
* 生产者的信道channel需要加入waitForConfirms()方法。
*/
boolean flag = channel.waitForConfirms();
waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
缺点:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布。
生产者案例代码:
package com.itholmes.rabbitmq.four;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* 发布确认模式
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
public class ConfirmMessage
public static final int MESSAGE_COUNT = 100;
public static void main(String[] args) throws Exception
//1.单个确认
pulishMessageIndividually();
//测试结果为:发布100个单独确认消息,耗时3075ms(毫秒),用来比较其他发布确认策略
//单个确认
public static void pulishMessageIndividually() throws IOException, TimeoutException, InterruptedException
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++)
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
/**
* 单个消息就马上进行发布确认
*/
boolean flag = channel.waitForConfirms();
if (flag)
System.out.println("消息发送成功");
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end-begin)+"ms(毫秒)");
5.4 批量确认发布
先发布一批消息,然后一起确认可以极大地提高吞吐量。就是一批一批的确认。
/**
* 批量确认消息大小
*/
int batchSize = 10;
//有时会确认未确认消息个数
//批量发送消息,批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++)
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
/**
* 判断达到10条消息的时候,批量确认一次
*/
if (i%batchSize==0)
channel.waitForConfirms();
缺点:
- 当发生故障导致发布出现问题时,不知道是哪个消息出现问题了。
- 因此,必须将整个批处理保存在内存中,以记录重要信息而后重新发布消息。
- 这种方案依然是同步的,也一样阻塞消息的发布。
生产者案例代码:
package com.itholmes.rabbitmq.four;
import com.itholmes.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* 发布确认模式
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
public class ConfirmMessage
public static final int MESSAGE_COUNT = 100;
public static void main(String[] args) throws Exception
//2.批量确认
pulishMessageBatch();
//测试结果为:发布100个批量确认消息,耗时435ms(毫秒)
//批量发送确认
public static void pulishMessageBatch() throws IOException, TimeoutException, InterruptedException
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
/**
* 批量确认消息大小
*/
int batchSize = 10;
//有时会确认未确认消息个数
//批量发送消息&以上是关于RabbitMQ 消息队列学习的主要内容,如果未能解决你的问题,请参考以下文章