饿了么:基于AMQP实现的golang消息队列MaxQ
Posted 架构师之巅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了饿了么:基于AMQP实现的golang消息队列MaxQ相关的知识,希望对你有一定的参考价值。
背景
饿厂此前一直是重度rabbitmq使用者,在使用的过程中遭遇了大量的问题,性能问题、故障排查问题等。Rabbitmq是用erlang开发的,该语言过于小众,实在无力在其之上再做运维和开发。痛定思痛,我们于是决定自研一个消息队列,为了降低业务层的接入难度,所以该消息队列需要兼容AMQP协议,这样就可以在业务层完全无感知的情况下接入MaxQ。
什么是AMQP协议?
AMQP(Advanced Message Queuing Protocol),是一套消息队列的七层应用协议标准,由摩根大通和iMatrix在2004年开始着手制定,于2006年发布规范,目前最新版是AMQP 1.0,MaxQ基于AMQP 0.9.1实现。相比zeroMQ这类无Broker的模型,AMQP是一种有Broker的协议模型,也就是需要运行单独AMQP中间件服务,生产者客户端和消费者客户端通过AMQP SDK与AMQP中间件服务通讯。像kafka、JMS这类以topic为核心的Broker,生产者发送key和数据到Broker,由Broker比较key之后决定给那个消费者;而AMQP中淡化了topic的概念,引入了Exchange模型,生产者发送key和数据到Exchange,Exchange根据消费者订阅queue的路由规则路由到对应的queue,也就是AMQP解耦了key和queue,即解藕了生产者和消费者,使得生产者和消费者之间的关系更灵活,消费者可自由控制消费关系。另外,AMQP是同时支持消息Push和Pull的模型,对于Push模型,消费者还可通过设置Qos达到流控的目的。
下图是AMQP 0.9.1规范中给出的AMQP架构模型图:
下面简单介绍下AMQP中的一些基本概念:
Broker:接收和分发消息的应用,MaxQ就是基于AMQP协议实现的Message Broker。
Connection:publisher / consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
Channel:如果客户端每次和Broker通信都需要建议一条连接,在并大量连接并发的情况下建立TCP Connection的开销将是巨大的,效率也较低。于是,AMQP引入了channel的概念,channel是connection之上应用层建立的逻辑连接,Broker在实现中可创建单独的thread/协程来实现channel的并发,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个Broker提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。
Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue:消息最终会落到queue里,消息会由Broker push给消费者,或者消费者主动去pull queue上的消息;一个message可以被同时分发到多个queue中。
Binding:exchange和queue之间的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用于message的分发依据。
MaxQ - AMQP实现架构
MaxQ对AMQP协议的实现,主要做了以下几件事:
按照协议spec自动生产frame encode/decode,这里采用了golang的text/template包,将AMQP spec抽象成固定的json和对应的代码模版,如:
1{
2 "id": 50,
3 "methods": [{"id": 10,
4 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
5 {"type": "shortstr", "name": "queue", "default-value": ""},
6 {"type": "bit", "name": "passive", "default-value": false},
7 {"type": "bit", "name": "durable", "default-value": false},
8 {"type": "bit", "name": "exclusive", "default-value": false},
9 {"type": "bit", "name": "auto-delete", "default-value": false},
10 {"type": "bit", "name": "nowait", "default-value": false},
11 {"type": "table", "name": "arguments", "default-value": {}}],
12 "name": "declare",
13 "synchronous" : true},
14 {"id": 11,
15 "arguments": [{"type": "shortstr", "name": "queue"},
16 {"type": "long", "name": "message-count"},
17 {"type": "long", "name": "consumer-count"}],
18 "name": "declare-ok"},
19 {"id": 20,
20 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
21 {"type": "shortstr", "name": "queue", "default-value": ""},
22 {"type": "shortstr", "name": "exchange"},
23 {"type": "shortstr", "name": "routing-key", "default-value": ""},
24 {"type": "bit", "name": "nowait", "default-value": false},
25 {"type": "table", "name": "arguments", "default-value": {}}],
26 "name": "bind",
27 "synchronous" : true},
28 {"id": 21,
29 "arguments": [],
30 "name": "bind-ok"},
31 {"id": 30,
32 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
33 {"type": "shortstr", "name": "queue", "default-value": ""},
34 {"type": "bit", "name": "nowait", "default-value": false}],
35 "name": "purge",
36 "synchronous" : true},
37 {"id": 31,
38 "arguments": [{"type": "long", "name": "message-count"}],
39 "name": "purge-ok"},
40 {"id": 40,
41 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
42 {"type": "shortstr", "name": "queue", "default-value": ""},
43 {"type": "bit", "name": "if-unused", "default-value": false},
44 {"type": "bit", "name": "if-empty", "default-value": false},
45 {"type": "bit", "name": "nowait", "default-value": false}],
46 "name": "delete",
47 "synchronous" : true},
48 {"id": 41,
49 "arguments": [{"type": "long", "name": "message-count"}],
50 "name": "delete-ok"},
51 {"id": 50,
52 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
53 {"type": "shortstr", "name": "queue", "default-value": ""},
54 {"type": "shortstr", "name": "exchange"},
55 {"type": "shortstr", "name": "routing-key", "default-value": ""},
56 {"type": "table", "name": "arguments", "default-value": {}}],
57 "name": "unbind",
58 "synchronous" : true},
59 {"id": 51,
60 "arguments": [],
61 "name": "unbind-ok"}
62 ],
63 "name": "queue"
64},
代码生成模版:
1type {{$struct}} struct {
2 {{range .Fields}}
3 {{.Literal}} {{$.TypeOfFieldLiteral .}}
4 {{end}}
5 {{if .Content}}
6 // Content
7 properties *Properties
8 body []byte
9 {{end}}
10}
11// Name returns the string representation of the Method, implements
12// Method.Name().
13func (m *{{$struct}}) Name() string {
14 return "{{.NameLiteral $class.Name}}"
15}
16// ID returns the AMQP index number of the Method, implements Method.ID().
17func (m *{{$struct}}) ID() uint16 {
18 return {{.ID}}
19}
20// Class returns a instance of the Class of this method, implements Method.Class().
21func (m *{{$struct}}) Class() Class {
22 return &{{$classStruct}}{}
23}
24// String returns the string representation for the Method.
25func (m *{{$struct}}) String() string {
26 return {{.StringLiteral $class.Name}}
27}
Vhost API实现:
1func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...
2func (v *VHost) QueueInspect(name string) ...
3func (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...
4func (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...
5func (v *VHost) QueuePurge(name string) ...
6func (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...
7func (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)
8func (v *VHost) ExchangeDelete(name string, ifUnused bool) ...
9func (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...
10func (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...
11func (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte)
12
Exchange接口化,实现4种Exchange路由模式
1type publisher interface {
2 bind(b *Binding) (exists bool)
3 unbind(b *Binding)
4 bindingsCount() int
5 allBindings() []*Binding
6 publish(msg *Message, routingKey string) (count int, err error)
7}
8type directPublisher struct {
9 ...
10}
11type fanoutPublisher struct {
12 ...
13}
14type topicPublisher struct {
15 ...
16}
17type headersPublisher struct {
18 ...
19}
Queue接口化——MaxQ集群
Normal Queue: queue功能的具体实现,包括Publish、Consume、Cancel、Ack、Get等,单机版MaxQ会实例化此queue。
Master Queue: Normal Queue的超集,集群模式下会实例化此queue,在HA镜像策略下会与Slave Queue同步消息。
Virtual Queue: 负责远程调用Master Queue的API,主要是用作消息转发。
Slave Queue: Virtual Queue的超集,除了消息转发,还和Master Queue进行消息同步,在Master Queue down掉后,会被选取为新的Master Queue。
MaxQ - 生产实现架构
如果要将MaxQ应用到生产,还需要更多工作要做:
MaxQ集群化,集群间的元数据通过zookeeper存储和同步,消息通过grpc进行通信。
通过四层Proxy,生产者或消费者客户端可以采用官方或第三方的AMQP SDK与MaxQ集群通讯。
集群管理,由于集群信息和元数据信息都存储在zookeeper上,因此通过zookeeper可以实现集群节点管理、扩容缩容和集群切换;
同时MaxQ本身提供了HTTP API管理和统计接口,因此可对集群进行监控统计、资源分配等。
MaxQ相关特性
1. 消息可靠性
Publishing可靠性,生产者设置confirm服务端确认机制,确认服务端成功接收到生产者消息。
消息Routing可靠性,生产者设置Publish mandatory,确认消息路由到queue。
Consuming可靠性,消费者设置手工Ack,服务端在收到消息Ack后才清除本地消息。
Persisting可靠性, 采用RAID1存储持久化消息;
分布式下的可靠性,设置queue的镜像模式,启动Slave Queue,与Master Queue进行消息同步,在aster Queue down掉后,Slave Queue可被选举为Master Queue。
2. 容错性
zookeeper不可用
1. 元数据已缓存在内存中,不会有任何影响,生产方和消费方仍可正常生产和消费
2. 服务会自动降级,元数据不可变更
3. zookeeper恢复,服务自愈
节点故障
通过zookeeper进行Master Queue选举:
1.NodeA和NodeB收到NodeC挂掉的事件,NodeA和NodeB成为Master queue的候选节点
2.NodeA和NodeB各自上报同步的offset到zookeeper
3.NodeA和NodeB各自决策,offset最新的NodeA选为Master queue
4.NodeA将Master信息同步至zookeeper
5.NodeB更新新的Master信息,并同步数据
6.NodeC恢复,成为Slave queue,并与新的Master同步数据
网络分区
3. 扩展性
HA、Exchange和Queue动态扩展属性参数
Exchange、Binding、Queue支持自定义扩展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange
使用场景和案例
下面介绍下MaxQ作为消息队列的经典三种使用场景和使用案例:
1. 异步解耦
订单系统与消息通知系统解耦
1.用户订单支付成功,直接向MaxQ推送下单成功通知,主流程迅速返回
2.消息通知系统异步接收通知消息, 发送短信通知或应用通知
2. 削峰填谷
SQL-autoreview系统分析优化SQL语句,并将结果落DB,属于慢消费, 生产高峰期处理能力不够,可利用MaxQ的堆积能力,匀速消费和处理。
3. 发布订阅
DC数据变更发布和订阅
1.DRC将DC的数据变更记录发布至MaxQ
2.各业务系统订阅相关的数据变更,并进一步做业务处理
未来的展望
1. Sharding Queue支持Queue的水平扩展,让单Queue的性能不再成为瓶颈;
2. 支持消息巨量堆积,让消息堆积不再成为问题;
3. 延时队列,支持按单消息延时,让消息延时变的简单,无需再通过ttl+deadletter exchange做延时推送;
4. 历史消息trace,追溯查询已经消费掉的消息,让生产方和消费方不再因消息是否生产了或消费了而发生扯皮。
作者介绍:
张培培,2015年加入饿了么,现任饿了么框架工具部架构师,负责饿了么消息队列MaxQ。
以上是关于饿了么:基于AMQP实现的golang消息队列MaxQ的主要内容,如果未能解决你的问题,请参考以下文章
消息队列客户端开发向导二(基于 Spring 的 amqp 实现)