消息队列,高并发的救火员

Posted 毛奇志

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列,高并发的救火员相关的知识,希望对你有一定的参考价值。

一、前言

二、消息队列三功能

消息队列定义:消息队列就是一个使用队列来通信的组件,消息队列常常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。

从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥10亿日活的微信。我们需要有一个东西来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。消息队列就应运而生了。它常用来实现:异步处理、服务解耦、流量控制。

2.1 异步通信

随着公司的发展你可能会发现你项目的请求链路越来越长,例如刚开始的电商项目,可以就是粗暴的扣库存、下单。慢慢地又加上积分服务、短信服务等。这一路同步调用下来客户可能等急了,这时候就是消息队列登场的好时机。

调用链路长、响应就慢了,并且相对于扣库存和下单,积分和短信没必要这么的 “及时”。因此只需要在下单结束那个流程,扔个消息到消息队列中就可以直接返回响应了。而且积分服务和短信服务可以并行的消费这条消息。

可以看出消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能。

从同步处理到使用消息队列异步处理
优点:有了异步处理之后,在业务模块增加的情况下,保护主功能响应速度不受到影响。
缺点:
(1) 结构复杂:架构复杂一级,稳定性差一级,运维成本增加一级;
(2) 消息延迟消息队列的消息通知机制有延迟(生产者放入消息,消费者取出消息),失去非核心模块(即核心模块的下流模块)的实时性,但是可以接受。
小结:对于任何一个技术,优点大于缺点,就可以选用。

2.2 服务解耦

上面我们说到加了积分服务和短信服务,这时候可能又要来个营销服务,之后领导又说想做个大数据,又来个数据分析服务等等。可以发现订单的下游系统在不断的扩充,为了迎合这些下游系统订单服务需要经常地修改,任何一个下游系统接口的变更可能都会影响到订单服务。所以一般会选用消息队列来解决系统之间耦合的问题,订单服务把订单相关消息塞到消息队列中,下游系统谁要谁就订阅这个主题。这样订单服务就解放啦!

问题:多线程也可以实现并发,为什么一定要使用消息队列?
回答:因为消息队列可以在模块层面解耦,而多线程无法做到。

2.3 流量控制

消息队列的流量控制又称为削峰填谷,后端服务相对而言都是比较弱的,因为业务较重,处理时间较长。像一些例如秒杀活动爆发式流量打过来可能就顶不住了。因此需要引入一个中间件来做缓冲,消息队列再适合不过了。

对于生产者生产过快:将网关的请求先放入消息队列中,后端服务尽自己最大能力去消息队列中消费请求,超时的请求可以直接返回错误。

对于消费者消费过慢:某些后台任务,不需要及时地响应,并且业务处理复杂且流程长,那么过来的请求先放入消息队列中,后端服务按照自己的节奏处理。这也是很 nice 的。

上面两种情况分别对应着生产者生产过快和消费者消费过慢两种情况,消息队列都能在其中发挥很好的缓冲效果。

流量控制(削峰填谷)本质:将不均匀的用户请求到消息队列中过一遍,变成了后端能够处理的、最大的均匀的请求。

三、消息队列的两种模型

3.1 队列模型

生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者, 但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。

3.2 发布/订阅模型

为了解决一条消息能被多个消费者消费的问题,发布/订阅模型就来了。该模型是将消息发往一个Topic即主题中,所有订阅了这个Topic的订阅者都能消费这条消息。

注意,队列模型通过多队列全量存储相同的消息,即数据的冗余也可以实现一条消息被多个消费者消费。RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题。

3.3 小结

队列模型 和 发布/订阅模型
第一,两者定义:队列模型每条消息只能被一个消费者消费,而发布/订阅模型就是为让一条消息可以被多个消费者消费而生的,当然队列模型也可以通过消息全量存储,然后发送至多个队列来实现一条消息被多个消费者消费问题,但是会有数据的冗余,需要通过接口幂等性避免重复消费。
第二,两者联系:队列模式是发布/订阅模型的特例,发布/订阅模型兼容队列模型,即只有一个消费者的情况下和队列模型基本一致。
第三,两者应用:RabbitMQ 采用队列模型,RocketMQ和Kafka 采用发布/订阅模型。

接下来的内容都基于发布/订阅模型,因为下面”消息队列“都是基于RocketMQ/Kafka来说的,这两个都是发布/订阅模式。

四、消息队列的三个独有设置

4.1 Producer、Broker与Consumer

定义(producer broker consumer):发送消息方为生产者 Producer,消息队列中存储消息的是Broker节点,消费消息方为消费者Consumer。

整个过程就是消息从Producer发往Broker,Broker将消息存储至本地,然后Consumer从Broker拉取消息,或者Broker推送消息至Consumer,最后消费。

4.2 使用队列或分区提高并发度

为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念(RocketMQ中叫队列,Kafka中叫分区),即消息是发往一个主题下的某个队列或者某个分区中,RocketMQ中叫队列,Kafka叫分区,本质一样。

问题:如何使用队列或分区提高broker并发度(RocketMQ中叫队列,Kafka叫分区)?
回答:例如某个主题topic下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。

问题:既然一个broker可以有多个队列或多个分区(RocketMQ中叫队列,Kafka叫分区),如何均匀分配?
回答:遵循平均分配的原则。producer生产的消息,如果只分配到一个队列或分区中,那么和没使用多个队列/多个分区一个样,所以,一定要平均分配。遵循平均分配的原则,一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。

4.3 消费者组

与之对应的消费者一般都有组的概念 Consumer Group, 即消费者一定是属于某个消费组的。一条消息会发往多个订阅了这个主题topic的消费组。

问题:一条消息会发往多个订阅了这个主题topic的消费组?
回答:一个消费者组中三个消费者,一个broker。
情况1:正常情况,一个消费者组中三个消费者对应三个partition,那么每个消费者消费一个partition
情况2:如果消费者组中的某个消费者挂了,则一个消费者组中,两个消费者对应三个partition,那么其中一个消费者可能就要消费两个partition了
情况3:如果只有三个partition,而消费者组有4个消费者,则一个消费者组中,四个消费者对应三个partition,那么一个消费者会空闲,没有东西可以消费
情况4:如果多加入一个消费者组,则两个消费者组对应三个partition,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据(即消费者组之间从逻辑上它们是独立的,一条消息会发往多个订阅了这个主题的消费组)。

所以,现在有两个消费组分别是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。然后这条消息实际是写入Topic某个队列中,消费组中的某个消费者对应消费一个队列的消息。在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,这就是RocketMQ/Kafka的高可用。

4.4 消费者的offset

每个消费组会有自己的offset(即消费点位)来标识消费到的位置,在消费点位之前的消息表明已经消费过了,在消费点位之后的消息表明还没有消费。当然这个offset是队列级别的。每个消费组都会维护订阅的Topic下的每个队列的offset。如下图:

对于上图的解释,从两个方面来看offset:
(1) 从队列/分区来说:offset是队列/分区级别的,一个队列/分区会维护各个消费者的offset;
(2) 从消费者组来说:offset是consumer级别的,每个消费者组的消费者,都会维护订阅的Topic下的每个队列的offset,上图中四个消费者,就有四个offset。
小结:同一个topic下,不同队列有不同offset;同一个topic同一个队列下,不同consumer不同offset。

五、消息队列五个问题

5.1 消息丢失问题

就我们市面上常见的消息队列而言,只要配置得当,我们的消息就不会丢。先来看看这个图,

我们来解释一下消息三个阶段的关系:
(1) 图中的 “生产消息 ”:可以说上图有对应,也可以说上图没有对应,对应生产消息,producer需要使用try…catch…处理好Broker的响应,出错情况下利用重试、报警等手段。
(2) 图中的 “收到啦”:对应上图中存储消息,Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个副本及以上的情况下再返回响应,从而保证“存储消息”过程中,消息不丢失。
(3) 图中的 “消费消息”:没有对应。
(4) 图中的 “消费好啦”:对应消费消息,consumer需要在执行完真正的业务逻辑之后再返回响应给Broker,从而保证“消费消息”过程中,消息不丢失。

可以看到一共有三个阶段,分别是生产消息、存储消息和消费消息。我们从这三个阶段分别入手来看看如何确保消息不会丢失。

5.1.1 发送消息阶段如何保证消息不丢失

生产者发送消息至Broker,需要处理Broker的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。这样就能保证在生产消息阶段消息不会丢失。

5.1.2 存储消息阶段如何保证消息不丢失

单机情况下,存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢(假如怕两台都挂了…那就再多些)。

5.1.3 消费消息阶段如何保证消息不丢失

对于消息消费者,并不是拿到消息之后直接存入内存队列中就直接返回给Broker消费成功,需要考虑拿到消息放在内存之后消费者就宕机了怎么办,所以应该在消费者真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

5.1.4 小结:如何保证消息不丢失

可以看出,保证消息的可靠性需要三方配合。

(1) 对于producer:producer需要使用try…catch…处理好Broker的响应,出错情况下利用重试、报警等手段。
(2) 对于broker:Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应;集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。
(3) 对于consumer:consumer需要在执行完真正的业务逻辑之后再返回响应给Broker。

注意,在producer-broker-consumer三者中避免消息丢失后,消息可靠性增强了,性能就下降了。以存储消息为例,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

5.2 重复消费问题

5.2.1 两种情况

第一种情况:如果producer-broker发送消息不需要broker响应,不会出现消息重复;
缺点:不满足业务需求,这样做,一般情况我们是不允许这样的,这样消息就完全不可靠了。
第二种情况:如果我们发送消息,消息至少得发到Broker上,那就得等Broker的响应,那么producer-broker发送消息就会重复。

(1) 由于producer-broker出错,由于需要等待响应导致重复发送,最终导致重复消费 :如果我们发送消息,消息至少得发到Broker上,那就得等Broker的响应,那么就可能存在Broker已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复发送。如果producer-broker发送消息需要broker响应,从producer-broker可以重复发送消息的,这个重复发送的消息(在切换消费者的时候)又可以被 broker-consumer 重复消费。

对于producer-broker 和 broker-consumer 合并为的 producer-broker-consumer 整个处理阶段,我们都无法防止重复消息的产生。理由在于,如果producer-broker发送消息需要broker响应,从producer-broker可以重复发送消息的,这个重复发送的消息(在切换消费者的时候)又可以被 broker-consumer 重复消费。

(2) 由于broker-consumer出错,存在重复消费:如果消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息就重复消费了。

这种由于broker-consumer出错造成重复消费是无法从根本上避免的,这是由于消息队列的服务质量带来了消息重传。消息领域有一个对消息投递的QoS(Quality of Service,服务质量)定义,分为:最多一次(At most once)、至少一次(At least once)、仅一次( Exactly once)。几乎所有的MQ产品都声称自己做到了At least once。既然是至少一次,那就是 大于一次,可以producer可以发送大于一次的消息,那避免不了消息重传。比如,在broker-consumer下,由于网络错误,导致consumer确认信息没有传送到消息队列broker,导致消息队列broker不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

注意:不同的消息队列broker发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,Kafka实际上有个offset的概念,表示消费进度。RocketMQ没有内置消息去重的解决方案,最新版本是否支持还需确认。

以上两种情况导致的消费重复消费,由于producer必须需要broker响应这一点是无法改变的,由于consumer宕机这样的事情也是无法避免的。所以,正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响,即消息消费方的实现接口幂等性。

5.2.2 消费者接口幂等性

不解决消费重复,而是解决消费重复带来的影响,消费者consumer幂等处理,因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。

数学上的幂等:幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

SQL语句的幂等:例如这条 SQLupdate t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等。

幂等性实现方式(数据库+后端逻辑):
方式1(后端层面,service做一个前置判断):后端service做一个前置条件判断,即money = 100情况,并且直接修改;
方式2(数据表层面,实际mysql表中加一个version字段):实际mysql表中加一个version字段即版本号控制,对比消息中的版本号和数据库中的版本号。
方式3(数据表层面,数据库的约束例如唯一键):数据库的约束例如唯一键,例如insert into update on duplicate key…
方式4(数据表层面,新建一个duplicate表,插入之前先检查duplicate中是否存在(电商用订单流水号检查,即关键key),存在不操作,不存在就插入duplicate表,然后修改/插入实际表):新建一个duplicate表,插入之前先检查duplicate中是否存在,存在就不操作,不存在就插入duplicate表,然后修改/插入实际表。即 “记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID”。

5.3 消息顺序消费问题

消息顺序消费问题,有序性分:全局有序和部分有序。

5.3.1 全局有序

如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区),消费者也必须是单线程消费这个队列。即一个producer -> 一个broker -> 一个consumer,这样的消息就是全局有序的。不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可。

5.3.2 部分有序

因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者,即n个producer -> n个broker -> n个consumer,这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

图中我画了多个生产者,一个生产者也可以,只要同类消息发往指定的队列即可。

5.4 消息堆积问题

消息堆积定义:broker上消息堆积。

消息堆积根本原因:生产者的生产速度与消费者的消费速度不匹配,从而导致broker上消息堆积。

消息堆积两个直接原因(造成根本原因):consumer消息消费失败反复重试,consumer消费者消费能力弱。

消息堆积实践处理:
第一,程序错误:先定位消费慢的原因,如果是消费失败反复重试,则处理 bug ;
第二,批量消费:如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
第三,硬件扩容:如果逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。

解释:注意队列数一定要增加,不然新增加的消费者是没东西消费的,一个Topic中,一个队列只会分配给一个消费者(RocketMQ中叫队列,Kafka中叫分区)?
前提:一个消费者组中三个消费者,一个broker
情况1:正常情况,一个消费者组中三个消费者对应三个partition,那么每个消费者消费一个partition
情况2:如果消费者组中的某个消费者挂了,则一个消费者组中,两个消费者对应三个partition,那么其中一个消费者可能就要消费两个partition了
情况3:如果只有三个partition,而消费者组有4个消费者,则一个消费者组中,四个消费者对应三个partition,那么一个消费者会空闲,没有东西可以消费
情况4:如果多加入一个消费者组,则两个消费者组对应三个partition,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据。(消费者组之间从逻辑上它们是独立的)

当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。

5.5 分布式数据一致性问题

问题1:引入消息队列,处理多个后台服务之间的数据不一致问题?
回答1:分布式事务,处理服务间的数据不一致问题。
用了消息队列让分布式数据不一致问题暴露得比较严重一点:数据不一致是分布式服务本身就存在的一个问题,并不是消息队列导致了数据不一致,只是因为使用了消息队列让分布式数据不一致问题暴露得比较严重一点。类似地,java多线程,使用Thread.sleep(1000L),并不是导致了线程安全问题,只是让线程安全问题暴露的更加明显;Redis也存在这个数据更新和数据一致性问题,就是缓存数据和mysql中的数据。

问题2:为什么分布式系统存在数据不一致问题?
回答2:在分布式系统中,为了保证数据的高可用,通常会将数据保留多个副本(replica),这些副本会放置在不同的物理的机器上。在数据有多份副本的情况下,如果网络、服务器或者软件出现故障,会导致部分副本写入成功,部分副本写入失败。这就造成各个副本之间的数据不一致,数据内容冲突。就造成事实上的数据不一致。比如,下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?
解决方式:分布式事务。使用分布式事务,将下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

六、消息队列的技术选型

目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。

现实是,ActiveMQ和RabbitMQ这两者因为吞吐量还有GitHub的社区活跃度的原因,而且不是分布式消息队列,无法适用微服务架构,所以,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。

第一,吞吐量:早期比较活跃的ActiveMQ 和RabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要。比如微博爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继。

第二,topic数量对于吞吐量的影响:
rocketmq topic达到几百几千吞吐量有下降,做分布式集群应对
kafka topic达到几十几百,吞吐量有下降,做分布式集群应对

第三,时效性/延时:rabbitMQ最好,所以单体架构,用户量不大的,要么直接用

第四,可用性和可靠性:前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

第五,功能支持:
(1)RabbitMQ 开发语言是erlang,绝大部分java工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。
(2)RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。
(3)Kafka 是压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

七、消息队列的推拉模型

7.1 推拉模式的时候指的是 Comsumer 和 Broker 之间的交互

一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。

问题1:为什么Producer和Broker之间使用推模式?
回答1:如果需要 Broker 去拉取消息,对于Producer,Producer 就必须在本地通过日志的形式保存消息来等待 Broker 的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠 Broker 自身,还需要靠成百上千的 Producer。Broker 还能靠多副本等机制来保证消息的存储可靠,而成百上千的Producer 可靠性就有点难办了,所以默认的 Producer 都是推消息给 Broker。使用推模式,对于Producer,可以让Producer来控制生产速度;对于Broker,它自己可以持久化消息,保存起来。

问题2:为什么Consumer和Broker之间使用拉模式?
回答2:使用拉模式,对于Consummer,可以让Consumer来控制消费速度;对于Broker,它自己可以持久化消息,保存起来。

总之,Prouducer和Broker之间使用推模式,Consumer和Broker之间使用拉模式,将消息存储在Broker节点上。

问题3:从削峰填谷的角度来看推拉模式?
方案一,producer-broker推模式,broker-consumer拉模式,broker控制速度,就可以完成削峰填谷,其优点包括三个:
(1) producer控制生产速度:不影响producer效率,可以不断生产,无脑推就好了,
(2) broker节点存储消息:producer不需要日志保存,consumer不需要日志保存,让broker来完成消息日志保存。
(3) consumer控制消费速度:不影响consumer效率,consumer按照自己想要的速度来。
方案二,producer-broker拉模式,broker控制速度,无论broker-consumer什么模式,都可以完成削峰填谷,但是缺点是producer要保存日志了,特别麻烦。
方案三,producer-broker推模式,broker-consumer推模式,没有任何控制速度,无法削峰填谷、

小结:procduer-broker broker-consumer之间,必须有一个拉模式,否则削峰填谷无法完成。

7.2 broker-consumer 推模式

推模式定义:推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。

推模式优点:
(1) 消费速度快,消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
(2) 消费者实现简单,消费者就等着,反正有消息来了就会推过来。

推模式缺点:
(1) 推送速率难以适应消费速率:推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了,这样消息队列原本的削峰填谷的作用就根本发挥不出来,速率完全由producer来控制;
(2) 不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率:如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。这其实就增加了 Broker 自身的复杂度。

推模式适用情况:推模式难以根据消费者的状态控制推送速率,适用于实时性要求高、消息量不大、消费能力强的情况下。

7.3 broker-consumer 拉模式

拉模式定义:拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。

拉模式优点:
(1) consumer控制消费速率:拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
(2) Broker无需计算推送速率:拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。

小结:拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。

拉模式缺点:
(1) consumer消费慢了,造成消息延迟:毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
(2) consumer消费快了,造成消息忙请求:忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

小结:两个缺点,统一就是consumer消费速度和producer生产速度不一致。

问题:各个消息中间件是如何选择broker与consumer之间的推拉模式的?
回答:RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
实际上,拉模式更加的合适,理由:
理由1:对于Broker:现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。
理由2:对于Consumer:消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。

问题:竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么?
回答:怕,所以它们操作了一波,减轻了拉模式的缺点。解决方式就是长轮询,RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。
小结:Consumer 和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。

八、面试金手指

8.1 消息队列三功能

消息队列三功能:异步 + 解耦 + 流量控制(削峰填谷)

从同步处理到使用消息队列异步处理
优点:有了异步处理之后,在业务模块增加的情况下,保护主功能响应速度不受到影响。
缺点:
(1) 结构复杂:架构复杂一级,稳定性差一级,运维成本增加一级;
(2) 消息延迟消息队列的消息通知机制有延迟(生产者放入消息,消费者取出消息),失去非核心模块(即核心模块的下流模块)的实时性,但是可以接受。
小结:对于任何一个技术,优点大于缺点,就可以选用。

问题:多线程也可以实现并发,为什么一定要使用消息队列?
回答:因为消息队列可以在模块层面解耦,而多线程无法做到。

流量控制(削峰填谷)本质:将不均匀的用户请求到消息队列中过一遍,变成了后端能够处理的、最大的均匀的请求。

8.2 队列模型和发布/订阅模型

队列模型 和 发布/订阅模型
第一,两者定义:队列模型每条消息只能被一个消费者消费,而发布/订阅模型就是为让一条消息可以被多个消费者消费而生的,当然队列模型也可以通过消息全量存储,然后发送至多个队列来实现一条消息被多个消费者消费问题,但是会有数据的冗余,需要通过接口幂等性避免重复消费。
第二,两者联系:队列模式是发布/订阅模型的特例,发布/订阅模型兼容队列模型,即只有一个消费者的情况下和队列模型基本一致。
第三,两者应用:RabbitMQ 采用队列模型,RocketMQ和Kafka 采用发布/订阅模型。

8.3 消息队列的三个独有设置

8.3.1 使用队列或分区提高并发度

为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念(RocketMQ中叫队列,Kafka中叫分区),即消息是发往一个主题下的某个队列或者某个分区中,RocketMQ中叫队列,Kafka叫分区,本质一样。

问题:如何使用队列或分区提高broker并发度(RocketMQ中叫队列,Kafka叫分区)?
回答:例如某个主题topic下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。

问题:既然一个broker可以有多个队列或多个分区(RocketMQ中叫队列,Kafka叫分区),如何均匀分配?
回答:遵循平均分配的原则。producer生产的消息,如果只分配到一个队列或分区中,那么和没使用多个队列/多个分区一个样,所以,一定要平均分配。遵循平均分配的原则,一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。

8.3.2 消费者组

与之对应的消费者一般都有组的概念 Consumer Group, 即消费者一定是属于某个消费组的。一条消息会发往多个订阅了这个主题topic的消费组。

问题:一条消息会发往多个订阅了这个主题topic的消费组?
回答:一个消费者组中三个消费者,一个broker
情况1:正常情况,一个消费者组中三个消费者对应三个partition,那么每个消费者消费一个partition
情况2:如果消费者组中的某个消费者挂了,则一个消费者组中,两个消费者对应三个partition,那么其中一个消费者可能就要消费两个partition了
情况3:如果只有三个partition,而消费者组有4个消费者,则一个消费者组中,四个消费者对应三个partition,那么一个消费者会空闲,没有东西可以消费
情况4:如果多加入一个消费者组,则两个消费者组对应三个partition,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据(即消费者组之间从逻辑上它们是独立的,一条消息会发往多个订阅了这个主题的消费组)。

所以,现在有两个消费组分别是Group 1 和 Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。然后这条消息实际是写入Topic某个队列中,消费组中的某个消费者对应消费一个队列的消息。在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,这就是RocketMQ/Kafka的高可用。

8.3.3 消费者的offset

从两个方面来看offset:
(1) 从队列/分区来说:offset是队列/分区级别的,一个队列/分区会维护各个消费者的offset;
(2) 从消费者组来说:offset是consumer级别的,每个消费者组的消费者,都会维护订阅的Topic下的每个队列的offset,上图中四个消费者,就有四个offset。
小结:同一个topic下,不同队列有不同offset;同一个topic同一个队列下,不同consumer不同offset。

8.4 消息队列五个问题

8.4.1 消息丢失问题

保证消息的可靠性需要三方配合:
(1) 对于producer:producer需要使用try…catch…处理好Broker的响应,出错情况下利用重试、报警等手段。
(2) 对于broker:Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应;集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。
(3) 对于consumer:consumer需要在执行完真正的业务逻辑之后再返回响应给Broker。

有的时候不需要这么高的可靠性,如果消息是日志收集的话,丢一两条关系不大:在producer-broker-consumer三者中避免消息丢失后,消息可靠性增强了,性能就下降了。以存储消息为例,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

8.4.2 重复消息问题

第一种情况:如果producer-broker发送消息不需要broker响应,不会出现消息重复;
缺点:不满足业务需求,这样做,一般情况我们是不允许这样的,这样消息就完全不可靠了。
第二种情况:如果我们发送消息,消息至少得发到Broker上,那就得等Broker的响应,那么producer-broker发送消息就会重复。

(1) 由于producer-broker出错,由于需要等待响应导致重复发送,最终导致重复消费 :如果我们发送消息,消息至少得发到Broker上,那就得等Broker的响应,那么就可能存在Broker已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复发送。如果producer-broker发送消息需要broker响应,从producer-broker可以重复发送消息的,这个重复发送的消息(在切换消费者的时候)又可以被 broker-consumer 重复消费。

对于producer-broker 和 broker-consumer 合并为的 producer-broker-consumer 整个处理阶段,我们都无法防止重复消息的产生。理由在于,如果producer-broker发送消息需要broker响应,从producer-broker可以重复发送消息的,这个重复发送的消息(在切换消费者的时候)又可以被 broker-consumer 重复消费。

(2) 由于broker-consumer出错,存在重复消费:如果消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息就重复消费了。

这种由于broker-consumer出错造成重复消费是无法从根本上避免的,这是由于消息队列的服务质量带来了消息重传。消息领域有一个对消息投递的QoS(Quality of Service,服务质量)定义,分为:最多一次(At most once)、至少一次(At least once)、仅一次( Exactly once)。几乎所有的MQ产品都声称自己做到了At least once。既然是至少一次,那就是 大于一次,可以producer可以发送大于一次的消息,那避免不了消息重传。比如,在broker-consumer下,由于网络错误,导致consumer确认信息没有传送到消息队列broker,导致消息队列broker不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

注意:不同的消息队列broker发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,Kafka实际上有个offset的概念,表示消费进度。RocketMQ没有内置消息去重的解决方案,最新版本是否支持还需确认。

以上两种情况导致的消费重复消费,由于producer必须需要broker响应这一点是无法改变的,由于consumer宕机这样的事情也是无法避免的。所以,正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响,即消息消费方的实现接口幂等性。

不解决消费重复,而是解决消费重复带来的影响,消费者consumer幂等处理,因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。

数学上的幂等:幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

SQL语句的幂等:例如这条 SQLupdate t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等。

幂等性实现方式(数据库+后端逻辑):
方式1(后端层面,service做一个前置判断):后端service做一个前置条件判断,即money = 100情况,并且直接修改;
方式2(数据表层面,实际mysql表中加一个version字段):实际mysql表中加一个version字段即版本号控制,对比消息中的版本号和数据库中的版本号。
方式3(数据表层面,数据库的约束例如唯一键):数据库的约束例如唯一键,例如insert into update on duplicate key…
方式4(数据表层面,新建一个duplicate表,插入之前先检查duplicate中是否存在(电商用订单流水号检查,即关键key),存在不操作,不存在就插入duplicate表,然后修改/插入实际表):新建一个duplicate表,插入之前先检查duplicate中是否存在,存在就不操作,不存在就插入duplicate表,然后修改/插入实际表。即 “记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID”。

8.4.3 消息顺序消费问题

如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区),消费者也必须是单线程消费这个队列。即一个producer -> 一个broker -> 一个consumer,这样的消息就是全局有序的。不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可。

因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者,即n个producer -> n个broker -> n个consumer,这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

8.4.4 消息堆积问题

消息堆积定义:broker上消息堆积。

消息堆积根本原因:生产者的生产速度与消费者的消费速度不匹配,从而导致broker上消息堆积。

消息堆积两个直接原因(造成根本原因):consumer消息消费失败反复重试,consumer消费者消费能力弱。

消息堆积实践处理:
第一,程序错误:先定位消费慢的原因,如果是消费失败反复重试,则处理 bug ;
第二,批量消费:如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
第三,硬件扩容:如果逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。

解释:注意队列数一定要增加,不然新增加的消费者是没东西消费的,一个Topic中,一个队列只会分配给一个消费者(RocketMQ中叫队列,Kafka中叫分区)?
前提:一个消费者组中三个消费者,一个broker
情况1:正常情况,一个消费者组中三个消费者对应三个partition,那么每个消费者消费一个partition
情况2:如果消费者组中的某个消费者挂了,则一个消费者组中,两个消费者对应三个partition,那么其中一个消费者可能就要消费两个partition了
情况3:如果只有三个partition,而消费者组有4个消费者,则一个消费者组中,四个消费者对应三个partition,那么一个消费者会空闲,没有东西可以消费
情况4:如果多加入一个消费者组,则两个消费者组对应三个partition,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据。(消费者组之间从逻辑上它们是独立的)

8.4.5 分布式数据一致性问题

问题:为什么分布式系统存在数据不一致问题?
回答:在分布式系统中,为了保证数据的高可用,通常会将数据保留多个副本(replica),这些副本会放置在不同的物理的机器上。在数据有多份副本的情况下,如果网络、服务器或者软件出现故障,会导致部分副本写入成功,部分副本写入失败。这就造成各个副本之间的数据不一致,数据内容冲突。就造成事实上的数据不一致。比如,下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?
解决方式:分布式事务。使用分布式事务,将下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

8.5 消息队列的技术选型

消息队列技术选型:

第一,吞吐量:早期比较活跃的ActiveMQ 和RabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要。比如微博爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继。

第二,topic数量对于吞吐量的影响:
rocketmq topic达到几百几千吞吐量有下降,做分布式集群应对
kafka topic达到几十几百,吞吐量有下降,做分布式集群应对

第三,时效性/延时:rabbitMQ最好,所以单体架构,用户量不大的,要么直接用

第四,可用性和可靠性:前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

第五,功能支持:
(1)RabbitMQ 开发语言是erlang,绝大部分java工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。
(2)RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。
(3)Kafka 是压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

8.6 消息队列的推拉模型

推模式优点:
(1) 消费速度快,消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。
(2) 消费者实现简单,消费者就等着,反正有消息来了就会推过来。

推模式缺点:
(1) 推送速率难以适应消费速率:推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了,这样消息队列原本的削峰填谷的作用就根本发挥不出来,速率完全由producer来控制;
(2) 不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率:如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。这其实就增加了 Broker 自身的复杂度。

推模式适用情况:推模式难以根据消费者的状态控制推送速率,适用于实时性要求高、消息量不大、消费能力强的情况下。

拉模式优点:
(1) consumer控制消费速率:拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
(2) Broker无需计算推送速率:拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是

以上是关于消息队列,高并发的救火员的主要内容,如果未能解决你的问题,请参考以下文章

高并发系统设计(十五):消息队列如何降低消息队列系统中消息的延迟?

大型分布式消息队列处理高并发

高并发框架

高并发处理思路与手段:消息队列

高并发为何高并发系统中都要使用消息队列?这次彻底懂了!

高并发架构系列:MQ消息队列的12点核心原理总结