RabbitMQ----消息可靠性传递
Posted 未来.....
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ----消息可靠性传递相关的知识,希望对你有一定的参考价值。
RabbitMQ的高级特性------消息可靠性传递
在开始之前,先想想几个问题,我们采用了生产者(channel)---->交换机------>队列中,那么如果其中生产者在向交换机发送信息是能一定发送成功吗?,该如何确认信息的状态,或者说交换机在向队列发送信息的时候发送失败,那么该如何解决。
举个例子:你购买了一件商品,商品要通过快递到你手里,就可以理解成你就是队列, 商品要到快递站,此时你的快递就有可能出现丢失的情况(这里可以理解成生产者(channel)---->交换机),快递丢了,是不是就要告诉一下商家,我没有收到快递,此时商家就会再发一个快递过来;
好,此时快递已经到达快递站了,快递小哥开始派送了,但是你一直没有收到快递, 就给快递小哥打电话,说我没有收到快递, 此时快递小哥又再次给你派送快递 (这里可以理解成交换机------>队列)。
上面这个例子就简单说明如果在商品运输的过程中出现丢失,那么会再次发送,直到确定你收到了快递。
此时就引入了一个一个名词,消息可靠性传递。
1、消息可靠性传递
那么如何保证消息可靠性传递呢?
此时就有两个模式:Confirm 确认模式和Return 退回模式
1、1 confirm 确认机制:
1.消息的确认,是指生产者投递消息后, 如果broker收到消息,则会给我们生产者一个应答
2.生产者进行接收应答,用来确定这条消息是否正常发送到broker, 这种方式也是可靠性投递的核心保障
接下来上代码:
创建一个spring boot项目,并在项目里面创建maven项目
在java包中创建一个启动类
@SpringBootApplication
public class ProductApp
public static void main(String[] args)
SpringApplication.run(ProductApp.class,args);
创建application.yml
文件
server:
port: 8001
spring:
rabbitmq:
#RabbitMQ的服务器IP地址
host: 192.168.31.33
# 开启rabbitMQ的生产方确认模式
# none:禁用发布确认模式
# correlated:发布消息成功到交换机后触发回调方法
publisher-confirm-type: correlated
这里只需要操作生产者(发送方)
为了方便测试,在测试类中创建测试单元
此时的交换机和队列需要使用现有的,如果没有【点这里】,有代码可以直接拿着用,去创建交换机和队列,我采用的是路由模式
提示:需要保证测试的交换机所绑定的队列是不能有消息的,否则里面的输出语句是不会被执行的。
@SpringBootTest
public class TestRabbitMq
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 保证发送方到交换机的可靠性
* 1、开启confirm模式 默认是关闭的
* 2、设置rabbitTemplate的确认回调函数,如果消息到达交换机则返回true
* 如果消息没有到达交换机则返回一个false
*/
@Test
public void testConfrim()
//确认回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
@Override
public void confirm(CorrelationData correlationData, boolean b, String s)
//需要队列中不能存在消息,否则此语句不会输出
System.out.println("信息是否发送到交换机"+b);
if(b==false)//消息没有到达交换机 根据业务需求。
System.out.println("继续发送消息");
//取消订单
);
rabbitTemplate.convertAndSend("ban_exchange_direct","error","hello confirm");
如果测试发送失败是否会输出,那么要保证你所要发送交换机的名字是不存在的,这样就可以保证一定是false,同时修改发送语句。
rabbitTemplate.convertAndSend("ban_direct","","hello confirm");
1、2 Return 退回机制
1.Return Listener 用于处理一些不可路由的消息, 也是生产端需要设置的, 我的的消息生产者,用过指定一个Exchange和Routingkey ,把消息送达到某一个对队列中去,然后我们的消费者监听队列,进行消费的处理操作;但是在某些情况下,如果我们在发送消息的时候, 当前的exchange 不存在或者指定的路由key 路由不到, 这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener;
同理,Return也是需要开启的,在刚才创建的yml
文件中添加开启Return 的语句
# 开启发布者退回模式
publisher-returns: true
同上,运行前也要保证队列中不能有信息,我采用的是路由模式的交换机。
/**
* 退回模式
* 1、开启退回模式
* 2、设置RabbitTemplate的退回回调函数
*/
@Test
public void testReturn()
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
//只要交换机到队列失败时才会触发该方法。失败之后可以继续发送也可以取消相应的业务功能。
System.out.println("消息从交换机到队列失败" + returnedMessage.getReplyText());
);
rabbitTemplate.convertAndSend("ban_exchange_direct", "abcd", "hello confirm2");
路由模式有routingKey
,只需要写一个你没有绑定的key,就可以完成这个测试。
这个模式只有在发送失败的时候才会触发,如果发送成功,此时是不会有输出语句。
就拿上面的例子来说,你如果长时间没有给快递小哥打电话,那么他就认为你已经收到快递了,如果你联系了快递小哥就会触发Return 退回机制
。
上面所列出来的Confrim 确认模式
和Return 退回模式
同时存在就可以保证生产者(发送方—》队列)过程中信息不会丢失。
现在要思考一个问题,如何保证队列中的信息能被消费者消费?
1、3 Consumer ACK
- 表示消费端收到消息后的确认方式。
- 有三种确认方式:自动确认,手动确认,根据异常情况确认(这种使用起来很麻烦,并且不常用,这里不做讲解)
- 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ
的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。 - 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
想象一下,如果队列中的信息在被消费者消费的时候,可能会出现异常,而这时你的业务功能还没有完成,那么就会导致该消息丢失。
1、3、1自动确认
在项目里再创建一个maven的comsumer项目
创建application.yml
文件
server:
port: 8001
spring:
rabbitmq:
host: 192.168.31.33
listener:
simple:
# 表示自动确认模式
acknowledge-mode: none
创建主启动类
@SpringBootApplication
public class ComsumerApp
public static void main(String[] args)
SpringApplication.run(ComsumerApp.class,args);
创建Listener消费者
@Component
public class MyListener
@RabbitListener(queues = "ban_queue_direct02")
public void listeber(Message message)
byte[] body = message.getBody();
String msg = new String(body);
System.out.println(msg);
//估计创建一个异常,测试是否可以消费掉队列中的信息
int c=10/0;
System.out.println("处理业务逻辑");
运行主启动类,发现报了异常,但同时也消费了队列中的信息。
这时可以理解成快递小哥把快递送到了,不管你有没有签收快递,他就认为你签收了快递,这种就是自动确认。
但是你可能因为各种原因,并没有收到这个快递,那么这个自动确认就很不好,这时就需要手动确认了。
1、3、2手动确认
修改application.yml
文件
listener:
simple:
# none表示自动确认模式
# manual表示手动确认模式
acknowledge-mode: manual
设置手动确认代码:
@Component
public class MyListener
@RabbitListener(queues = "ban_queue_direct02")
public void listeber(Message message, Channel channel)throws Exception
//获取信息的标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] body = message.getBody();
String msg = new String(body);
System.out.println(msg);
try
//手动创建异常
int c=10/0;
System.out.println("处理业务逻辑");
//消费端手动确认消息
//long deliveryTag, 表示的标识。
// boolean multiple:是否允许多确认
channel.basicAck(deliveryTag,true);//从队列中删除该消息。
catch (Exception e)
//(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
channel.basicNack(deliveryTag,true,true);
此时运行代码,发现会一直获取队列中的信息,队列中的信息一直等待确认,并没有被消费掉。
把异常干掉,正常的运行一次。
同时队列中的信息也被消费掉了。
1、3、3 Consumer ACK小结
- 在RabbitListener-container标签中设置acknowledge属性,设置ack方式none:自动确认,manual:手动确认
- 如果在消费端没有出现异常,则调用
channel.basicAck(deliveryTag,true);
签收信息。 - 如果出现异常,则在catch中调用
channel.basicNack
,让MQ重新发送信息。
2、消息可靠性传递的总结
- 保证消息从发送者到交换机的可靠性: 使用Confirm确认机制。
- 保证消息从交换机到队列的可靠性; 使用return回退机制。
- 消息在队列中的可靠性。 设置队列和消息的持久化。
- 保证消息从队列到消费者的可靠性。 使用消费端的手动确认机制。
以上是关于RabbitMQ----消息可靠性传递的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ简介RabbitMQ 特点AMQP 概念生产者和消费者 Broker 服务节点Queue 队列Exchange 交换器如何保证消息的可靠性?