一、基础概念
1、理解消息通信
AMQP定义,高级消息队列协议。
优点。可靠性、解耦(请求和操作)以及实时消息通信。
消费者、生产者和代理。
消息持久化策略。通过配置项选择是否持久化的到磁盘,性能与可靠性的权衡,发送方确认。
消息的生命周期
生产者:
- 连接到MQ
- 获取信道
- 声明交换器(交换器类型,路由键)
- 创建消息
- 发布消息
- 关闭信道、关闭连接
消费者:
- 连接到MQ
- 获取信道
- 声明交换器(交换器类型)
- 声明队列
- 绑定(把队列与交换器进行绑定)
- (接收消息)消费消息
- 关闭信道、关闭连接
2、AMQP关键组件
交换器、队列、绑定。
各个组件之间的联系是:
- 根据绑定规则将队列绑定到交换器。
- 消息发布到交换器。
- 基于消息的路由键和交换类型,服务器将消息投递到对应的队列。
发布与订阅
basic.consume命令订阅,信道置为接收模式。
basic.publish命令发布,信道设置为接收模式。
3、消息分发
交换器类型
- direct
- fanout
- topic
消息分发机制
- direct交换器的消息分发,是以队列名作为路由键。生产者与队列之间是一对一的消息分发模式。
- fanout交换器的消息分发,一对多的广播消息。
- topic交换器的消息分发,通过路由键进行正则匹配进而分发消息。可以实现多对多的消息分发。
事实上,由于topic交换器的匹配规则囊括了一对一、一对多、多对多的消息派发机制,我们在进行消息通信时,几乎全部使用的是这种交换器。
二、解耦,解耦,解耦
1、分离请求和动作
2、自动轮询
配置更为复杂的路由方案,实现轮询负载均衡。
通过简单的增加消费者实现弹性伸缩。
3、语言无关
三、Rpc模式的使用
使用reply_to实现Rpc
私有队列:exclusive、auto_delete
流程图
关键点
- 幂等性。
- 响应队列的复用。
- 异常处理。主要包括请求超时、连接不到服务器等异常情况的处理。
框架选型
我使用的.net core编写的测试。
在开始的时候参考官方API,通过覆写SimpleRpcClient和SimpleServer实现了Rpc调用。
但是很快我发现像快速实现一个包含异常处理、日记记录、消息确保、支持发布/订阅以及Rpc的框架,对我来说还是很困难的。
大概比较了现在成熟的消息队列框架,最后选择MassTransit。
理由如下
- 开源。不解释了
- 文档比较全、社区活跃。一般的使用方法和问题解答,看文档就可以解决,很适合快速上手开发;遇到问题一般提问或看以前的issues都会有解答,当然不行了咋还有源码不是。
- 扩展性好,很多功能都是以扩展包的形式提供。在不满足需求的情况下,可以自己写扩展实现。
我测试对比了手写rpc与框架的rpc方式,速度差不多都是只有第一次调用的时候用了大概几百毫秒,之后几毫秒。性能还是非常不错的。