MQ-死信队列实现消息延迟
Posted 啃瓜子的松鼠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ-死信队列实现消息延迟相关的知识,希望对你有一定的参考价值。
死信队列实现消息延迟
一、延迟队列
延迟队列:消息进入到队列之后,延迟指定的时间才能被消费者消费。
AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能。
TTL就是消息的存活时间,RabbitMQ可以分别对队列和消息设置存活时间。
- 在创建队列的时候可以设置队列的存活时间,消息进入队列后,在存活时间内没有被消费者消费,则此消息会从当前队列移除。
- 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除。
- 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列。
二、使用延迟队列实现订单支付监控
- 实现流程图如图:
- 创建路由交换机
- 创建消息队列
- 创建死信队列
- 队列绑定
-
发送消息到交换机delay_exchange的k1(即消息队列delay_queue1)
//普通maven项目演示 //发送消息 public class SendMsg public static void main(String[] args) System.out.println("请输入消息:"); Scanner input = new Scanner(System.in); String msg = input.nextLine(); Connection connection = null; try //获取连接,相当于JDBC的获取数据库连接 connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //发消息之前开启消息确认 channel.confirmSelect(); channel.basicPublish("delay_exchange","k1",null,msg.getBytes()); //消息发送之后等待消息反馈 try boolean b = channel.waitForConfirms(); System.out.println("发送--->" + msg + (b ? "成功": "失败")); catch (InterruptedException e) e.printStackTrace(); //关闭 channel.close(); connection.close(); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace();
发送消息:
此时开启接收队列delay_queue2(而不是delay_queue1)的消息:会发现不能实时接收,需要等到delay_queue1的TTL时间到后才能成功接收到消息。
//普通maven项目演示
//接受消息
public class ReceiveMsg
public static void main(String[] args)
Connection connection = null;
//获取连接,相当于JDBC的获取数据库连接
try
connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
//声明要关注的队列
//channel.queueDeclare("queue1", false, false, false, null);
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("consumer2消费消息:'" + message + "'");
;
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume("delay_queue2", true, consumer);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
由于在前面创建死信队列设置的delay_queue1的TTL时间为10s,因此间隔10s后成功接收到消息:
演示完毕!
以上是关于MQ-死信队列实现消息延迟的主要内容,如果未能解决你的问题,请参考以下文章