饿了么:基于AMQP实现的golang消息队列MaxQ

Posted 架构师之巅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了饿了么:基于AMQP实现的golang消息队列MaxQ相关的知识,希望对你有一定的参考价值。

背景

饿厂此前一直是重度rabbitmq使用者,在使用的过程中遭遇了大量的问题,性能问题、故障排查问题等。Rabbitmq是用erlang开发的,该语言过于小众,实在无力在其之上再做运维和开发。痛定思痛,我们于是决定自研一个消息队列,为了降低业务层的接入难度,所以该消息队列需要兼容AMQP协议,这样就可以在业务层完全无感知的情况下接入MaxQ。

什么是AMQP协议?

AMQP(Advanced Message Queuing Protocol),是一套消息队列的七层应用协议标准,由摩根大通和iMatrix在2004年开始着手制定,于2006年发布规范,目前最新版是AMQP 1.0,MaxQ基于AMQP 0.9.1实现。相比zeroMQ这类无Broker的模型,AMQP是一种有Broker的协议模型,也就是需要运行单独AMQP中间件服务,生产者客户端和消费者客户端通过AMQP SDK与AMQP中间件服务通讯。像kafka、JMS这类以topic为核心的Broker,生产者发送key和数据到Broker,由Broker比较key之后决定给那个消费者;而AMQP中淡化了topic的概念,引入了Exchange模型,生产者发送key和数据到Exchange,Exchange根据消费者订阅queue的路由规则路由到对应的queue,也就是AMQP解耦了key和queue,即解藕了生产者和消费者,使得生产者和消费者之间的关系更灵活,消费者可自由控制消费关系。另外,AMQP是同时支持消息Push和Pull的模型,对于Push模型,消费者还可通过设置Qos达到流控的目的。

下图是AMQP 0.9.1规范中给出的AMQP架构模型图:

饿了么:基于AMQP实现的golang消息队列MaxQ


下面简单介绍下AMQP中的一些基本概念:

  • Broker:接收和分发消息的应用,MaxQ就是基于AMQP协议实现的Message Broker。

  • Connection:publisher / consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。

  • Channel:如果客户端每次和Broker通信都需要建议一条连接,在并大量连接并发的情况下建立TCP Connection的开销将是巨大的,效率也较低。于是,AMQP引入了channel的概念,channel是connection之上应用层建立的逻辑连接,Broker在实现中可创建单独的thread/协程来实现channel的并发,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个Broker提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。

  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。

  • Queue:消息最终会落到queue里,消息会由Broker push给消费者,或者消费者主动去pull queue上的消息;一个message可以被同时分发到多个queue中。

  • Binding:exchange和queue之间的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用于message的分发依据。


  • MaxQ - AMQP实现架构


饿了么:基于AMQP实现的golang消息队列MaxQ


MaxQ对AMQP协议的实现,主要做了以下几件事:

按照协议spec自动生产frame encode/decode,这里采用了golang的text/template包,将AMQP spec抽象成固定的json和对应的代码模版,如:

 1{
2    "id": 50,
3    "methods": [{"id": 10,
4                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
5                   {"type": "shortstr", "name": "queue", "default-value": ""},
6                               {"type": "bit", "name": "passive", "default-value": false},
7                               {"type": "bit", "name": "durable", "default-value": false},
8                               {"type": "bit", "name": "exclusive", "default-value": false},
9                               {"type": "bit", "name": "auto-delete", "default-value": false},
10                               {"type": "bit", "name": "nowait", "default-value": false},
11                               {"type": "table", "name": "arguments", "default-value": {}}],
12                 "name": "declare",
13                 "synchronous" : true},
14                {"id": 11,
15                 "arguments": [{"type": "shortstr", "name": "queue"},
16                               {"type": "long", "name": "message-count"},
17                               {"type": "long", "name": "consumer-count"}],
18                 "name": "declare-ok"},
19                {"id": 20,
20                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
21                               {"type": "shortstr", "name": "queue", "default-value": ""},
22                               {"type": "shortstr", "name": "exchange"},
23                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
24                               {"type": "bit", "name": "nowait", "default-value": false},
25                               {"type": "table", "name": "arguments", "default-value": {}}],
26                 "name": "bind",
27                 "synchronous" : true},
28                {"id": 21,
29                 "arguments": [],
30                 "name": "bind-ok"},
31                {"id": 30,
32                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
33                               {"type": "shortstr", "name": "queue", "default-value": ""},
34                               {"type": "bit", "name": "nowait", "default-value": false}],
35                 "name": "purge",
36                 "synchronous" : true},
37                {"id": 31,
38                 "arguments": [{"type": "long", "name": "message-count"}],
39                 "name": "purge-ok"},
40                {"id": 40,
41                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
42                               {"type": "shortstr", "name": "queue", "default-value": ""},
43                               {"type": "bit", "name": "if-unused", "default-value": false},
44                               {"type": "bit", "name": "if-empty", "default-value": false},
45                               {"type": "bit", "name": "nowait", "default-value": false}],
46                 "name": "delete",
47                 "synchronous" : true},
48                {"id": 41,
49                 "arguments": [{"type": "long", "name": "message-count"}],
50                 "name": "delete-ok"},
51                {"id": 50,
52                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
53                               {"type": "shortstr", "name": "queue", "default-value": ""},
54                               {"type": "shortstr", "name": "exchange"},
55                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
56                               {"type": "table", "name": "arguments", "default-value": {}}],
57                 "name": "unbind",
58                 "synchronous" : true},
59                {"id": 51,
60                 "arguments": [],
61                 "name": "unbind-ok"}
62                ],
63    "name": "queue"
64},


代码生成模版:

 1type {{$struct}} struct {
2    {{range .Fields}}
3        {{.Literal}} {{$.TypeOfFieldLiteral .}}
4    {{end}}
5    {{if .Content}}
6        // Content
7        properties *Properties
8        body []byte
9    {{end}}
10}
11// Name returns the string representation of the Method, implements
12// Method.Name().
13func (m *{{$struct}}) Name() string {
14    return "{{.NameLiteral $class.Name}}"
15}
16// ID returns the AMQP index number of the Method, implements Method.ID().
17func (m *{{$struct}}) ID() uint16 {
18    return {{.ID}}
19}
20// Class returns a instance of the Class of this method, implements Method.Class().
21func (m *{{$struct}}) Class() Class {
22    return &{{$classStruct}}{}
23}
24// String returns the string representation for the Method.
25func (m *{{$struct}}) String() string {
26    return {{.StringLiteral $class.Name}}
27}


Vhost API实现:

 1func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...
2func (v *VHost) QueueInspect(name string) ...
3func (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...
4func (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...
5func (v *VHost) QueuePurge(name string) ...
6func (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...
7func (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)
8func (v *VHost) ExchangeDelete(name string, ifUnused bool) ...
9func (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...
10func (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...
11func (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte)
12


Exchange接口化,实现4种Exchange路由模式

 1type publisher interface {
2    bind(b *Binding) (exists bool)
3    unbind(b *Binding)
4    bindingsCount() int
5    allBindings() []*Binding
6    publish(msg *Message, routingKey string) (count int, err error)
7}
8type directPublisher struct {
9 ...
10}
11type fanoutPublisher struct {
12 ...
13}
14type topicPublisher struct {
15 ...
16}
17type headersPublisher struct {
18 ...
19}


Queue接口化——MaxQ集群

  • Normal Queue: queue功能的具体实现,包括Publish、Consume、Cancel、Ack、Get等,单机版MaxQ会实例化此queue。

  • Master Queue: Normal Queue的超集,集群模式下会实例化此queue,在HA镜像策略下会与Slave Queue同步消息。

  • Virtual Queue: 负责远程调用Master Queue的API,主要是用作消息转发。

  • Slave Queue: Virtual Queue的超集,除了消息转发,还和Master Queue进行消息同步,在Master Queue down掉后,会被选取为新的Master Queue。


MaxQ - 生产实现架构

饿了么:基于AMQP实现的golang消息队列MaxQ


如果要将MaxQ应用到生产,还需要更多工作要做:

  1. MaxQ集群化,集群间的元数据通过zookeeper存储和同步,消息通过grpc进行通信。

  2. 通过四层Proxy,生产者或消费者客户端可以采用官方或第三方的AMQP SDK与MaxQ集群通讯。

  3. 集群管理,由于集群信息和元数据信息都存储在zookeeper上,因此通过zookeeper可以实现集群节点管理、扩容缩容和集群切换;

同时MaxQ本身提供了HTTP API管理和统计接口,因此可对集群进行监控统计、资源分配等。


MaxQ相关特性

1. 消息可靠性

  • Publishing可靠性,生产者设置confirm服务端确认机制,确认服务端成功接收到生产者消息。

  • 消息Routing可靠性,生产者设置Publish mandatory,确认消息路由到queue。

  • Consuming可靠性,消费者设置手工Ack,服务端在收到消息Ack后才清除本地消息。

  • Persisting可靠性, 采用RAID1存储持久化消息;

  • 分布式下的可靠性,设置queue的镜像模式,启动Slave Queue,与Master Queue进行消息同步,在aster Queue down掉后,Slave Queue可被选举为Master Queue。

2. 容错性

  • zookeeper不可用

饿了么:基于AMQP实现的golang消息队列MaxQ


1. 元数据已缓存在内存中,不会有任何影响,生产方和消费方仍可正常生产和消费

2. 服务会自动降级,元数据不可变更

3. zookeeper恢复,服务自愈


  • 节点故障

饿了么:基于AMQP实现的golang消息队列MaxQ


通过zookeeper进行Master Queue选举:

1.NodeA和NodeB收到NodeC挂掉的事件,NodeA和NodeB成为Master queue的候选节点

2.NodeA和NodeB各自上报同步的offset到zookeeper

3.NodeA和NodeB各自决策,offset最新的NodeA选为Master queue

4.NodeA将Master信息同步至zookeeper

5.NodeB更新新的Master信息,并同步数据

6.NodeC恢复,成为Slave queue,并与新的Master同步数据


  • 网络分区

饿了么:基于AMQP实现的golang消息队列MaxQ


3. 扩展性

饿了么:基于AMQP实现的golang消息队列MaxQ


  1. HA、Exchange和Queue动态扩展属性参数

  2. Exchange、Binding、Queue支持自定义扩展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange


使用场景和案例

下面介绍下MaxQ作为消息队列的经典三种使用场景和使用案例:

1. 异步解耦

饿了么:基于AMQP实现的golang消息队列MaxQ


订单系统与消息通知系统解耦

1.用户订单支付成功,直接向MaxQ推送下单成功通知,主流程迅速返回

2.消息通知系统异步接收通知消息, 发送短信通知或应用通知


2. 削峰填谷


SQL-autoreview系统分析优化SQL语句,并将结果落DB,属于慢消费, 生产高峰期处理能力不够,可利用MaxQ的堆积能力,匀速消费和处理。


3. 发布订阅

DC数据变更发布和订阅

1.DRC将DC的数据变更记录发布至MaxQ

2.各业务系统订阅相关的数据变更,并进一步做业务处理

未来的展望

1. Sharding Queue支持Queue的水平扩展,让单Queue的性能不再成为瓶颈;

2. 支持消息巨量堆积,让消息堆积不再成为问题;

3. 延时队列,支持按单消息延时,让消息延时变的简单,无需再通过ttl+deadletter exchange做延时推送;

4. 历史消息trace,追溯查询已经消费掉的消息,让生产方和消费方不再因消息是否生产了或消费了而发生扯皮。


作者介绍:

张培培,2015年加入饿了么,现任饿了么框架工具部架构师,负责饿了么消息队列MaxQ。


                                               


以上是关于饿了么:基于AMQP实现的golang消息队列MaxQ的主要内容,如果未能解决你的问题,请参考以下文章

AMQP Golang优先级不工作

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

RabbitMQ AMQP (高级消息队列协议)

饿了么:基于非阻塞Java的数据库中间件

Golang使用amqp发送消息