手动的进行消息应答
Posted 杀手不太冷!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手动的进行消息应答相关的知识,希望对你有一定的参考价值。
消息应答
概念
现在有很多消费者线程同时访问RabbitMQ消息队列,如果其中的一个消费者取到RabbitMQ消息队列的一个消息之后,突然挂掉了,那么RabbitMQ消息队列中的这条消息是不是就丢失了呢?我们该怎么解决这个问题呢?
我们引入了消息应答机制,当消费者从RabbitMQ消息队列中取出消息并且成功的处理了之后,消费者会告诉RabbitMQ消息队列,说“我已经成功的处理了这条消息,你的这条消息可以删除了。”然后RabbitMQ消息队列就会把它里面的这条对应的消息给删除掉。
消息应答的方法
Channel.basicAck()调用该方法之后,消费者就告诉了RabbitMQ消息队列说“你的消息我已经成功处理了,现在你可以把这条信息给删除了”。
Channel.basicNack()和Channel.basicReject()这两个方法的作用都是告诉RabbitMQ消息队列说"你的消息我没有成功处理,所以你不要把这条信息给删除"。
应答方式分为两种:一种是自动应答,另外一种是手动应答。
如果是自动应答,可能会出现bug,为什么呢?因为自动应答就表示只要消费者一取到消息队列中的消息,就表示处理消息成功,RabbitMQ消息队列中就会删除掉这条消息。但是问题是,消费者取到消息之后,后面的代码才是处理消息的代码,如果后面的代码出现异常,那么这条消息就丢失了,所以我们工作中要使用非自动应答的方式,也即是手动应答的方式,即什么时候消费者处理消息成功,然后手动的告诉RabbitMQ消息队列“我已经处理信息成功了”。
"如果手动进行消息应答RabbitMQ中的消息不会丢失"的测试思路
首先需要一个生产者,这个生产者可以通过控制台连续的往RabbitMQ的消息队列中存放信息。然后需要两个消费者线程,一个消费者线程从RabbitMQ消息队列中取到消息的速度比较快是1s,另外一个消费者线程从RabbitMQ消息队列中取到消息的速度比较慢是100s,如果我们这这100s之内,把这个消费者线程宕机,那么这条消息就会被另外一个消费者取到,因为RabbitMQ消息队列中没有删除这条消息。
生产者代码
/**
* @Date 2021/11/10 19:14
* @Author 望轩
*
*
*/
public class Task02 {
//队列名称
public static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.210.39");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从控制台当中接收信息
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String message=scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成"+message);
}
}
}
消费者代码
/**
* @Date 2021/11/10 19:21
* @Author 望轩
*/
public class Worker02 {
//队列名称
public static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.210.39");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("C2等待接收消息处理时间较长");
//如果能成功接收到消息会调用的回调函数
DeliverCallback deliverCallback=(consumerTag, message)->{
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者接收到的消息:"+new String(message.getBody()));
//手动应答
/**
* 1.消息的标记tag,也就是我们是手动应答的消息队列中的哪个消息
* 2.是否批量应答,false:不批量应答消息队列中的消息 true:批量应答消息队列中的消息
*
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//如果取消从消息队列中获取消息时会调用的回调函数
CancelCallback cancelCallback= consumerTag->{
System.out.println(consumerTag+"消息消费被中断");
};
//采用手动应答:这里的第二个参数为false表示是使用手动应答的方式
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
上面是执行的获取消息比较快的消费者线程,另外一个线程也好获取,只要改上面代码的两个地方就行了,一个地方是把C2改成C1,另外一个地方是把睡眠时间从100s改成1s,然后重新启动一下这个main方法就能重新启动一个线程了,但是有条件,我们必须要给Worker02勾选一个允许多个线程同时运行的选项,如下图:
测试
启动生产者
首先启动生产者,如下图:
生产者启动之后,在RabbitMQ消息队列中会多出一个ack_queue队列,如下图:
启动消费者
然后启动两个消费者线程,一个消费者获取信息比较快是1s,另外一个消费者获取信息比较慢是100s,如下图:
生产者生产两个消息看看消费者的接收情况
在生产者生产两条消息到RabbitMQ消息队列,按道理来说这两条消息会轮询地发送到两个消费者线程中,但是我们故意在一个消费者线程处理这条消息的时候,把这个消费者线程down机,那么这两条消息全部都会发送给同一个消费者线程。如下图:
以上是关于手动的进行消息应答的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ——消息手动应答队列/消息持久化不公平分发预取值的概念理解及应用举例