并发之消息队列----基于AMQP实现的golang消息队列MaxQ

Posted 牛牛技术资源社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发之消息队列----基于AMQP实现的golang消息队列MaxQ相关的知识,希望对你有一定的参考价值。

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。



消息被处理的过程相当于流程A被处理。我们这里以一个实际的模型来讨论下,比如用户下单成功时给用户发短信,如果没有这个消息队列,我们会选择同步调用发短信的接口,

并等待短息发送成功,这时候假设短信接口实现出现问题了,或者短信调用端超时了,又或者短信发送达到上限了,我们是选择重试几次还是放弃,还是选择把这个放到数据库

过一段时间再看看呢,不管怎样,实现都很复杂。

我们可以将发短信这个请求放在消息队列里,消息队列按照一定的顺序挨个处理队列里的消息,当处理到发送短信的任务时,通知短信服务发送消息,如果出现之前出现的问题,那么把这个消息重新放到消息队列中。

消息队列的好处:

1.成功完成了一个异步解耦的过程。短信发送时只要保证放到消息队列中就可以了,接着做后面的事情就行。一个事务只关心本质的流程,需要依赖其他事情但是不那么重要的时候,有通知即可,无需等待结果。每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系。

对于我们的订单系统,订单最终支付成功之后可能需要给用户发送短信积分什么的,但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢(比如短信网关速度不好),那么主流程的时间会加长很多,用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”,不一定非要等待它处理完成。

2.保证了最终一致性,通过在队列中存放任务保证它最终一定会执行。

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转账过程来理解最终一致性,转账的需求很简单,如果A系统扣钱成功,则B系统加钱一定成功。反之则一起回滚,像什么都没发生一样。
然而,这个过程中存在很多可能的意外:

  1. A扣钱成功,调用B加钱接口失败。

  2. A扣钱成功,调用B加钱接口虽然成功,但获取最终结果时网络异常引起超时。

  3. A扣钱成功,B加钱失败,A想回滚扣的钱,但A机器down机。

可见,想把这件看似简单的事真正做成,真的不那么容易。所有跨VM的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性,分布式事务,但落地太难且成本太高,后文会具体提到。

  2. 最终一致性,主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。
    回到刚才的例子,系统在A扣钱成功的情况下,把要给B“通知”这件事记录在库里(为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里),通知成功则删除这条记录,通知失败或不确定则依靠定时任务补偿性地通知我们,直到我们把状态更新成正确的为止。

3.广播

消息队列的基本功能之一是进行广播。如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

3.提速。假设我们还需要发送邮件,有了消息队列就不需要同步等待,我们可以直接并行处理,而下单核心任务可以更快完成。增强业务系统的异步处理能力。甚至几乎不可能出现并发现象。

4.削峰和流控。不对于不需要实时处理的请求来说,当并发量特别大的时候,可以先在消息队列中作缓存,然后陆续发送给对应的服务去处理

试想上下游对于事情的处理能力是不同的。比如,Web前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。
这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。
支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

消息队列的使用场景:

主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

使用场景的话,举个例子:
假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:

  1. 校验用户名等信息,如果没问题会在数据库中添加一个用户记录

  2. 如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信

  3. 分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他

  4. 发送给用户一个包含操作指南的系统通知

  5. 等等……

但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。
或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。

并发之消息队列----基于AMQP实现的golang消息队列MaxQ



为什么需要消息队列?
生产和消费的速度或者稳定性不一致。

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


Kafka的介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka 有如下特性:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。

  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。

  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。

  • 同时支持离线数据处理和实时数据处理。

  • Scale out:支持在线水平扩展。

kafka的术语

  • Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。

  • Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。

  • Producer:负责发布消息到Kafka broker。

  • Consumer:消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


下面来介绍RabbitMQ里的一些基本定义,主要如下:
RabbitMQ Server:提供消息一条从Producer到Consumer的处理。
Exchange:一边从发布者方接收消息,一边把消息推送到队列。
producer只能将消息发送给exchange。而exchange负责将消息发送到queues。Procuder Publish的Message进入了exchange,exchange会根据routingKey处理接收到的消息,判断消息是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的主要的type有direct,topic,headers,fanout。具体针对不同的场景使用不同的type。
queue也是通过这个routing keys来做的绑定。交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
Queue:消息队列。接收来自exchange的消息,然后再由consumer取出。exchange和queue可以一对一,也可以一对多,它们的关系通过routingKey来绑定。
Producer:Client A & B,生产者,消息的来源,消息必须发送给exchange。而不是直接给queue
Consumer:Client 1,2,3消费者,直接从queue中获取消息进行消费,而不是从exchange中获取消息进行消费。

背景

饿厂此前一直是重度rabbitmq使用者,在使用的过程中遭遇了大量的问题,性能问题、故障排查问题等。Rabbitmq是用erlang开发的,该语言过于小众,实在无力在其之上再做运维和开发。痛定思痛,我们于是决定自研一个消息队列,为了降低业务层的接入难度,所以该消息队列需要兼容AMQP协议,这样就可以在业务层完全无感知的情况下接入MaxQ。

什么是AMQP协议?

AMQP(Advanced Message Queuing Protocol),是一套消息队列的七层应用协议标准,由摩根大通和iMatrix在2004年开始着手制定,于2006年发布规范,目前最新版是AMQP 1.0,MaxQ基于AMQP 0.9.1实现。相比zeroMQ这类无Broker的模型,AMQP是一种有Broker的协议模型,也就是需要运行单独AMQP中间件服务,生产者客户端和消费者客户端通过AMQP SDK与AMQP中间件服务通讯。像kafka、JMS这类以topic为核心的Broker,生产者发送key和数据到Broker,由Broker比较key之后决定给那个消费者;而AMQP中淡化了topic的概念,引入了Exchange模型,生产者发送key和数据到Exchange,Exchange根据消费者订阅queue的路由规则路由到对应的queue,也就是AMQP解耦了key和queue,即解藕了生产者和消费者,使得生产者和消费者之间的关系更灵活,消费者可自由控制消费关系。另外,AMQP是同时支持消息Push和Pull的模型,对于Push模型,消费者还可通过设置Qos达到流控的目的。

下图是AMQP 0.9.1规范中给出的AMQP架构模型图:

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


下面简单介绍下AMQP中的一些基本概念:

  • Broker:接收和分发消息的应用,MaxQ就是基于AMQP协议实现的Message Broker。

  • Connection:publisher / consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。

  • Channel:如果客户端每次和Broker通信都需要建议一条连接,在并大量连接并发的情况下建立TCP Connection的开销将是巨大的,效率也较低。于是,AMQP引入了channel的概念,channel是connection之上应用层建立的逻辑连接,Broker在实现中可创建单独的thread/协程来实现channel的并发,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个Broker提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。

  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。

  • Queue:消息最终会落到queue里,消息会由Broker push给消费者,或者消费者主动去pull queue上的消息;一个message可以被同时分发到多个queue中。

  • Binding:exchange和queue之间的消息路由策略,binding中可以包含routing key。Binding信息被保存到exchange中的路由表中,用于message的分发依据。

MaxQ - AMQP实现架构


并发之消息队列----基于AMQP实现的golang消息队列MaxQ


MaxQ对AMQP协议的实现,主要做了以下几件事:

按照协议spec自动生产frame encode/decode,这里采用了golang的text/template包,将AMQP spec抽象成固定的json和对应的代码模版,如:

{
    "id": 50,
    "methods": [{"id": 10,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                   {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "passive", "default-value": false},
                               {"type": "bit", "name": "durable", "default-value": false},
                               {"type": "bit", "name": "exclusive", "default-value": false},
                               {"type": "bit", "name": "auto-delete", "default-value": false},
                               {"type": "bit", "name": "nowait", "default-value": false},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "declare",
                 "synchronous" : true},
                {"id": 11,
                 "arguments": [{"type": "shortstr", "name": "queue"},
                               {"type": "long", "name": "message-count"},
                               {"type": "long", "name": "consumer-count"}],
                 "name": "declare-ok"},
                {"id": 20,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "shortstr", "name": "exchange"},
                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
                               {"type": "bit", "name": "nowait", "default-value": false},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "bind",
                 "synchronous" : true},
                {"id": 21,
                 "arguments": [],
                 "name": "bind-ok"},
                {"id": 30,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "nowait", "default-value": false}],
                 "name": "purge",
                 "synchronous" : true},
                {"id": 31,
                 "arguments": [{"type": "long", "name": "message-count"}],
                 "name": "purge-ok"},
                {"id": 40,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "bit", "name": "if-unused", "default-value": false},
                               {"type": "bit", "name": "if-empty", "default-value": false},
                               {"type": "bit", "name": "nowait", "default-value": false}],
                 "name": "delete",
                 "synchronous" : true},
                {"id": 41,
                 "arguments": [{"type": "long", "name": "message-count"}],
                 "name": "delete-ok"},
                {"id": 50,
                 "arguments": [{"type": "short", "name": "ticket", "default-value": 0},
                               {"type": "shortstr", "name": "queue", "default-value": ""},
                               {"type": "shortstr", "name": "exchange"},
                               {"type": "shortstr", "name": "routing-key", "default-value": ""},
                               {"type": "table", "name": "arguments", "default-value": {}}],
                 "name": "unbind",
                 "synchronous" : true},
                {"id": 51,
                 "arguments": [],
                 "name": "unbind-ok"}
                ],
    "name": "queue"},

代码生成模版:

type {{$struct}} struct {
    {{range .Fields}}
        {{.Literal}} {{$.TypeOfFieldLiteral .}}
    {{end}}
    {{if .Content}}
        // Content
        properties *Properties
        body []byte
    {{end}}}// Name returns the string representation of the Method, implements// Method.Name().func (m *{{$struct}}) Name() string {
    return "{{.NameLiteral $class.Name}}"}// ID returns the AMQP index number of the Method, implements Method.ID().func (m *{{$struct}}) ID() uint16 {
    return {{.ID}}}// Class returns a instance of the Class of this method, implements Method.Class().func (m *{{$struct}}) Class() Class {
    return &{{$classStruct}}{}}// String returns the string representation for the Method.func (m *{{$struct}}) String() string {
    return {{.StringLiteral $class.Name}}}


Vhost API实现:

func (v *VHost) QueueDeclare(node, name string, durable, exclusive, autoDelete bool, args amqp.Table) ...func (v *VHost) QueueInspect(name string) ...func (v *VHost) QueueBind(name, key, exchange string, args amqp.Table) ...func (v *VHost) QueueUnbind(name, key, exchange string, args amqp.Table) ...func (v *VHost) QueuePurge(name string) ...func (v *VHost) QueueDelete(name string, ifUnused, ifEmpty bool) ...func (v *VHost) ExchangeDeclare(name string, exType string, durable bool, autoDelete bool, internal bool, arguments amqp.Table)func (v *VHost) ExchangeDelete(name string, ifUnused bool) ...func (v *VHost) ExchangeBind(destination, key, source string, args amqp.Table) ...func (v *VHost) ExchangeUnbind(destination, key, source string, args amqp.Table) ...func (v *VHost) Publish(exchange, key string, mandatory, immediate bool, props *amqp.Properties, body []byte) 


Exchange接口化,实现4种Exchange路由模式

// Exchange publishertype publisher interface {
    bind(b *Binding) (exists bool)
    unbind(b *Binding)
    bindingsCount() int
    allBindings() []*Binding
    publish(msg *Message, routingKey string) (count int, err error)}
 type directPublisher struct {
 ...}
  type fanoutPublisher struct {
 ...}
  type topicPublisher struct {
 ...}
  type headersPublisher struct {
 ...}


Queue接口化——MaxQ集群

  • Normal Queue: queue功能的具体实现,包括Publish、Consume、Cancel、Ack、Get等,单机版MaxQ会实例化此queue。

  • Master Queue: Normal Queue的超集,集群模式下会实例化此queue,在HA镜像策略下会与Slave Queue同步消息。

  • Virtual Queue: 负责远程调用Master Queue的API,主要是用作消息转发。

  • Slave Queue: Virtual Queue的超集,除了消息转发,还和Master Queue进行消息同步,在Master Queue down掉后,会被选取为新的Master Queue。


MaxQ - 生产实现架构

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


如果要将MaxQ应用到生产,还需要更多工作要做:

  1. MaxQ集群化,集群间的元数据通过zookeeper存储和同步,消息通过grpc进行通信。

  2. 通过四层Proxy,生产者或消费者客户端可以采用官方或第三方的AMQP SDK与MaxQ集群通讯。

  3. 集群管理,由于集群信息和元数据信息都存储在zookeeper上,因此通过zookeeper可以实现集群节点管理、扩容缩容和集群切换;

同时MaxQ本身提供了HTTP API管理和统计接口,因此可对集群进行监控统计、资源分配等。


MaxQ相关特性

1. 消息可靠性

  • Publishing可靠性,生产者设置confirm服务端确认机制,确认服务端成功接收到生产者消息。

  • 消息Routing可靠性,生产者设置Publish mandatory,确认消息路由到queue。

  • Consuming可靠性,消费者设置手工Ack,服务端在收到消息Ack后才清除本地消息。

  • Persisting可靠性, 采用RAID1存储持久化消息;

  • 分布式下的可靠性,设置queue的镜像模式,启动Slave Queue,与Master Queue进行消息同步,在aster Queue down掉后,Slave Queue可被选举为Master Queue。

2. 容错性

  • zookeeper不可用

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


1. 元数据已缓存在内存中,不会有任何影响,生产方和消费方仍可正常生产和消费

2. 服务会自动降级,元数据不可变更

3. zookeeper恢复,服务自愈


  • 节点故障

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


通过zookeeper进行Master Queue选举:

1.NodeA和NodeB收到NodeC挂掉的事件,NodeA和NodeB成为Master queue的候选节点

2.NodeA和NodeB各自上报同步的offset到zookeeper

3.NodeA和NodeB各自决策,offset最新的NodeA选为Master queue

4.NodeA将Master信息同步至zookeeper

5.NodeB更新新的Master信息,并同步数据

6.NodeC恢复,成为Slave queue,并与新的Master同步数据


  • 网络分区

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


3. 扩展性

并发之消息队列----基于AMQP实现的golang消息队列MaxQ


  1. HA、Exchange和Queue动态扩展属性参数

  2. Exchange、Binding、Queue支持自定义扩展, 如:x-message-ttl、x-expires、x-max-length、x-dead-letter-exchange


使用场景和案例

下面介绍下MaxQ作为消息队列的经典三种使用场景和使用案例:

1. 异步解耦


订单系统与消息通知系统解耦

1.用户订单支付成功,直接向MaxQ推送下单成功通知,主流程迅速返回

2.消息通知系统异步接收通知消息, 发送短信通知或应用通知


2. 削峰填谷


SQL-autoreview系统分析优化SQL语句,并将结果落DB,属于慢消费, 生产高峰期处理能力不够,可利用MaxQ的堆积能力,匀速消费和处理。


3. 发布订阅

DC数据变更发布和订阅

1.DRC将DC的数据变更记录发布至MaxQ

2.各业务系统订阅相关的数据变更,并进一步做业务处理


未来的展望

1. Sharding Queue支持Queue的水平扩展,让单Queue的性能不再成为瓶颈;

2. 支持消息巨量堆积,让消息堆积不再成为问题;

3. 延时队列,支持按单消息延时,让消息延时变的简单,无需再通过ttl+deadletter exchange做延时推送;

4. 历史消息trace,追溯查询已经消费掉的消息,让生产方和消费方不再因消息是否生产了或消费了而发生扯皮。


以上是关于并发之消息队列----基于AMQP实现的golang消息队列MaxQ的主要内容,如果未能解决你的问题,请参考以下文章

饿了么:基于AMQP实现的golang消息队列MaxQ

快速入门分布式消息队列之 RabbitMQ(上)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

AMQP消息队列之RabbitMQ简单示例

Spring Boot异步消息之AMQP讲解及实战(附源码)