AMQP协议和rabbitMQ
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AMQP协议和rabbitMQ相关的知识,希望对你有一定的参考价值。
参考技术A AMQP,即 高级消息队列协议 (Advanced Message Queuing Protocol),是一个消息中间件应用层协议,用于组件之间的解耦,来提供 统一消息服务。主要功能是 排序消息,路由消息(包括点对点和订阅-发布),保证消息的可靠性和安全性。
遵循AMPQ协议的客户端,都能通过 消息中间件 相互通信。这样 客户端 就可以采用不同的开发语言实现,彼此无强依赖关系,降低客户端复杂性,提高开发效率也利于后期维护。
AMQP 的模型架构如下:
rabbitMQ是AMQP协议的一个开源实现。架构模型同样可以用以下的图来表示:
如上图,simple模式,单个publisher,单个queue,单个consumer
如上图,work模式
多个consumer共用一个queue的message
此种模式下,rabbitMQ会自动做负载均衡,将消息轮询发送给各个消费者,即一个消息只能被一个消费者获取
如上图,publish / subscribe 发布订阅模式(广播模式)
相对前2种模式,多了一个 exchange (type为fanout) ,message先发送到exchange,exchange再分别发送到对应的所有queue。而consumer订阅自己的queue,在自己订阅的queue上消费message。
示例应用场景,如下图示:
比如 网上购物,下单支付成功后,通知用户的方式有许多种,app推送,短信,email 等等。
message到来后被exchange发送到3个queue(app推送q,短信q,email_q)
之后 app推送服务,短信通知服务,email通知服务 从各自订阅的queue获取消息,通知用户支付成功
如上图示,exchange类型设定为direct
此时 message中的rountingKey 和 exchange中的bindingKey匹配,两者相等则发送对应的queue中,如果匹配不到bindingKey,则丢弃该message。
示例应用场景,如下图示:
比如服务产生的日志,日志有许多类型,error,info,debuf等类型的日志,而我们的需求只想要将 error 类型的日志写入磁盘,就可以用routing模式,将error日志路由到error queue,再由相应的 写入磁盘服务获取message,写入磁盘
如上图示,exchange类型为topic,相对于第4种模式,相同点是都根据 rountingKey 匹配,不同点是 topic 模式支持模糊匹配。
MQ-死信队列实现消息延迟
死信队列实现消息延迟
一、延迟队列
延迟队列:消息进入到队列之后,延迟指定的时间才能被消费者消费。
AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能。
TTL就是消息的存活时间,RabbitMQ可以分别对队列和消息设置存活时间。
- 在创建队列的时候可以设置队列的存活时间,消息进入队列后,在存活时间内没有被消费者消费,则此消息会从当前队列移除。
- 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除。
- 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列。
二、使用延迟队列实现订单支付监控
- 实现流程图如图:
- 创建路由交换机
- 创建消息队列
- 创建死信队列
- 队列绑定
-
发送消息到交换机delay_exchange的k1(即消息队列delay_queue1)
//普通maven项目演示 //发送消息 public class SendMsg public static void main(String[] args) System.out.println("请输入消息:"); Scanner input = new Scanner(System.in); String msg = input.nextLine(); Connection connection = null; try //获取连接,相当于JDBC的获取数据库连接 connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); //发消息之前开启消息确认 channel.confirmSelect(); channel.basicPublish("delay_exchange","k1",null,msg.getBytes()); //消息发送之后等待消息反馈 try boolean b = channel.waitForConfirms(); System.out.println("发送--->" + msg + (b ? "成功": "失败")); catch (InterruptedException e) e.printStackTrace(); //关闭 channel.close(); connection.close(); catch (IOException e) e.printStackTrace(); catch (TimeoutException e) e.printStackTrace();
发送消息:
此时开启接收队列delay_queue2(而不是delay_queue1)的消息:会发现不能实时接收,需要等到delay_queue1的TTL时间到后才能成功接收到消息。
//普通maven项目演示
//接受消息
public class ReceiveMsg
public static void main(String[] args)
Connection connection = null;
//获取连接,相当于JDBC的获取数据库连接
try
connection = MQUtil.getConnection();
Channel channel = connection.createChannel();
//声明要关注的队列
//channel.queueDeclare("queue1", false, false, false, null);
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("consumer2消费消息:'" + message + "'");
;
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume("delay_queue2", true, consumer);
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
由于在前面创建死信队列设置的delay_queue1的TTL时间为10s,因此间隔10s后成功接收到消息:
演示完毕!
以上是关于AMQP协议和rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章