七万字,151张图,通宵整理消息队列核心知识点总结!这次彻底掌握MQ!

Posted yes的练级攻略

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了七万字,151张图,通宵整理消息队列核心知识点总结!这次彻底掌握MQ!相关的知识,希望对你有一定的参考价值。

前言

本文主要涵盖了关于消息队列的大部分核心知识点,涉及的消息队列有 RocketMQ、Kafka。

本文很长,所有内容都为博主原创,纯手打,如果觉得不错的话,来个点赞评论收藏三连呀!

之后还会有迭代版本,当然也会有其他的总结输出,欢迎关注我,不迷路~

关于这篇文章也整理了 PDF,已上传至 CSDN,还准备了暗黑版本,更加护眼!

点击下载消息队列核心知识点PDF


排版也是 OK 滴!

本文大纲:

  • 从消息队列常见面试题入手来解析消息队列
  • 如何设计一个消息队列?
  • 消息队列设计成推消息还是拉消息?RocketMQ和Kafka是怎么做的?
  • 消息队列之事务消息?RocketMQ和Kafka是怎么做的?
  • 比 RocketMQ 更好的事务消息实现是什么?
  • Kafka的索引设计有什么亮点?
  • Kafka日志段如何读写解析?
  • Kafka控制器事件处理全流程解析
  • Kafka请求处理全流程解析
  • Kafka为什么要抛弃Zookeeper?
  • 进阶必看的 RocketMQ,这次一网打尽
  • Kafka和RocketMQ底层存储揭秘,为什么能这么快?
  • 未完待更新

从消息队列常见面试题入手来解析消息队列

今儿咱们就来盘一盘大方向上的消息队列有哪些核心注意点。

核心点有很多,为了更贴合实际场景,我从常见的面试问题入手:

  • 如何保证消息不丢失?
  • 如果处理重复消息?
  • 如何保证消息的有序性?
  • 如果处理消息堆积?

当然在剖析这几个问题之前需要简单的介绍下什么是消息队列,消息队列常见的一些基本术语和概念

接下来进入正文。

什么是消息队列

来看看维基百科怎么说的,顺带学学英语这波不亏:

In computer science, message queues and mailboxes are software-engineering components typically used for inter-process communication (IPC), or for inter-thread communication within the same process. They use a queue for messaging – the passing of control or of content. Group communication systems provide similar kinds of functionality.

翻译一下:在计算机科学领域,消息队列和邮箱都是软件工程组件,通常用于进程间或同一进程内的线程通信。它们通过队列来传递消息-传递控制信息或内容,群组通信系统提供类似的功能。

简单的概括下上面的定义:消息队列就是一个使用队列来通信的组件

上面的定义没有错,但就现在而言我们日常所说的消息队列常常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。

为什么需要消息队列

从本质上来说是因为互联网的快速发展,业务不断扩张,促使技术架构需要不断的演进。

从以前的单体架构到现在的微服务架构,成百上千的服务之间相互调用和依赖。从互联网初期一个服务器上有 100 个在线用户已经很了不得,到现在坐拥10亿日活的微信。我们需要有一个「东西」来解耦服务之间的关系、控制资源合理合时的使用以及缓冲流量洪峰等等。

消息队列就应运而生了。它常用来实现:异步处理、服务解耦、流量控制

异步处理

随着公司的发展你可能会发现你项目的请求链路越来越长,例如刚开始的电商项目,可以就是粗暴的扣库存、下单。慢慢地又加上积分服务、短信服务等。这一路同步调用下来客户可能等急了,这时候就是消息队列登场的好时机。

调用链路长、响应就慢了,并且相对于扣库存和下单,积分和短信没必要这么的 “及时”。因此只需要在下单结束那个流程,扔个消息到消息队列中就可以直接返回响应了。而且积分服务和短信服务可以并行的消费这条消息。

可以看出消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能

服务解耦

上面我们说到加了积分服务和短信服务,这时候可能又要来个营销服务,之后领导又说想做个大数据,又来个数据分析服务等等。

可以发现订单的下游系统在不断的扩充,为了迎合这些下游系统订单服务需要经常地修改,任何一个下游系统接口的变更可能都会影响到订单服务,这订单服务组可疯了,真 ·「核心」项目组

所以一般会选用消息队列来解决系统之间耦合的问题,订单服务把订单相关消息塞到消息队列中,下游系统谁要谁就订阅这个主题。这样订单服务就解放啦!

流量控制

想必大家都听过「削峰填谷」,后端服务相对而言都是比较「弱」的,因为业务较重,处理时间较长。像一些例如秒杀活动爆发式流量打过来可能就顶不住了。因此需要引入一个中间件来做缓冲,消息队列再适合不过了。

网关的请求先放入消息队列中,后端服务尽自己最大能力去消息队列中消费请求。超时的请求可以直接返回错误。

当然还有一些服务特别是某些后台任务,不需要及时地响应,并且业务处理复杂且流程长,那么过来的请求先放入消息队列中,后端服务按照自己的节奏处理。这也是很 nice 的。

上面两种情况分别对应着生产者生产过快和消费者消费过慢两种情况,消息队列都能在其中发挥很好的缓冲效果。

注意

引入消息队列固然有以上的好处,但是多引入一个中间件系统的稳定性就下降一层,运维的难度抬高一层。因此要权衡利弊系统是演进的

消息队列基本概念

消息队列有两种模型:队列模型发布/订阅模型

队列模型

生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,
但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。

发布/订阅模型

为了解决一条消息能被多个消费者消费的问题,发布/订阅模型就来了。该模型是将消息发往一个Topic即主题中,所有订阅了这个 Topic 的订阅者都能消费这条消息。

其实可以这么理解,发布/订阅模型等于我们都加入了一个群聊中,我发一条消息,加入了这个群聊的人都能收到这条消息。
那么队列模型就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。

讲到这有人说,那我一对一聊天对每个人都发同样的消息不就也实现了一条消息被多个人消费了嘛。

是的,通过多队列全量存储相同的消息,即数据的冗余可以实现一条消息被多个消费者消费。RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题。

这里还能看到假设群聊里除我之外只有一个人,那么此时的发布/订阅模型和队列模型其实就一样了。

小结一下

队列模型每条消息只能被一个消费者消费,而发布/订阅模型就是为让一条消息可以被多个消费者消费而生的,当然队列模型也可以通过消息全量存储至多个队列来解决一条消息被多个消费者消费问题,但是会有数据的冗余。

发布/订阅模型兼容队列模型,即只有一个消费者的情况下和队列模型基本一致。

RabbitMQ 采用队列模型,RocketMQKafka 采用发布/订阅模型。

接下来的内容都基于发布/订阅模型

常用术语

一般我们称发送消息方为生产者 Producer,接受消费消息方为消费者Consumer,消息队列服务端为Broker

消息从Producer发往BrokerBroker将消息存储至本地,然后ConsumerBroker拉取消息,或者Broker推送消息至Consumer,最后消费。

为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念。即消息是发往一个主题下的某个队列或者某个分区中。RocketMQ中叫队列,Kafka叫分区,本质一样。

例如某个主题下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。

与之对应的消费者一般都有组的概念 Consumer Group, 即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组。

假设现在有两个消费组分别是Group 1Group 2,它们都订阅了Topic-a。此时有一条消息发往Topic-a,那么这两个消费组都能接收到这条消息。

然后这条消息实际是写入Topic某个队列中,消费组中的某个消费者对应消费一个队列的消息。

在物理上除了副本拷贝之外,一条消息在Broker中只会有一份,每个消费组会有自己的offset即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个offset是队列级别的。每个消费组都会维护订阅的Topic下的每个队列的offset

来个图看看应该就很清晰了。

基本上熟悉了消息队列常见的术语和一些概念之后,咱们再来看看消息队列常见的核心面试点。

如何保证消息不丢失

就我们市面上常见的消息队列而言,只要配置得当,我们的消息就不会丢。

先来看看这个图,

可以看到一共有三个阶段,分别是生产消息、存储消息和消费消息。我们从这三个阶段分别入手来看看如何确保消息不会丢失。

生产消息

生产者发送消息至Broker,需要处理Broker的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。

这样就能保证在生产消息阶段消息不会丢失。

存储消息

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢(假如怕两台都挂了…那就再多些)。

那假如来个地震机房机子都挂了呢?emmmmmm…大公司基本上都有异地多活。

那要是这几个地都地震了呢?emmmmmm…这时候还是先关心关心人吧。

消费消息

这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给Broker消费成功,这是不对的。

你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

小结一下

可以看出,保证消息的可靠性需要三方配合

生产者需要处理好Broker的响应,出错情况下利用重试、报警等手段。

Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。

消费者需要在执行完真正的业务逻辑之后再返回响应给Broker

但是要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

如果处理重复消息

我们先来看看能不能避免消息的重复。

假设我们发送消息,就管发,不管Broker的响应,那么我们发往Broker是不会重复的。

但是一般情况我们是不允许这样的,这样消息就完全不可靠了,我们的基本需求是消息至少得发到Broker上,那就得等Broker的响应,那么就可能存在Broker已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。

再看消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。

可以看到正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。

关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响。

幂等处理重复消息

幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的。

例如这条 SQL
update t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等。

因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。

可以通过上面我那条 SQL 一样,做了个前置条件判断,即money = 100情况,并且直接修改,更通用的是做个version即版本号控制,对比消息中的版本号和数据库中的版本号。

或者通过数据库的约束例如唯一键,例如insert into update on duplicate key...

或者记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID等等。

基本上就这么几个套路,真正应用到实际中还是得看具体业务细节

如何保证消息的有序性

有序性分:全局有序和部分有序

全局有序

如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息,并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!

不过一般情况下我们都不需要全局有序,即使是同步mysql Binlog也只需要保证单表消息有序即可。

部分有序

因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

图中我画了多个生产者,一个生产者也可以,只要同类消息发往指定的队列即可。

如果处理消息堆积

消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

因此我们需要先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。

假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者

当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提高的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。

最后

上面的几个问题都是我们在使用消息队列的时候经常能遇到的问题,并且也是面试关于消息队列方面的核心考点。今天没有深入具体消息队列的细节,但是套路就是这么个套路,大方向上搞明白很关键

如何设计一个消息队列?

这种设计类问题想必大家都不陌生,面试时或多或少都能碰到。

比如如何写一个线程池?如何写一个 HashMap ?如何写一个 RPC 框架等等,当然这里的写不是真的叫你用代码写出来,只是说说设计理念,整体架构。

这个面试题来自于一个读者的字节面试经历,我会从面试技巧和消息中间件的设计两个方面阐述。

我觉得重点在于面试技巧,因为它通用。

两种极端的情况

大多数同学遇到这种问题会出现两种极端的情况:

  • 第一种:一脸懵逼,两眼无神,不知从何说起,万般思绪,都化作一声叹息。

  • 第二种:夸夸其谈,像是口中架起了一把加特林,哒哒哒哒哒哒哒哒,还冒着蓝火。

第一种不用说了,好一点的面试官可能会引导你,会问一些提示性的问题,一步一步地带你渐入佳境,当然你要是胸中无点滴,那还是没救的,场面就异常地尴尬。

第二种会把面试官整蒙了,或许你真的懂很多,很多细节也都清晰,但是你不能一股脑儿的都抛出来,这会显得你抓不住重点。

面试官也是人

这点其实很关键,很多把面试官当成一个莫得感情的提问机器人,觉得他无所不能可以完全 get 到你的点,殊不知你引以为傲的细节回答,他可能觉得你在说蛇皮。

是人就会有感情,就需要交流,好的面试官会把控整体进度,从拉家常开始,让场子热起来再一步一步的深挖。

当然也有一些面试官比较弱,这时候就需要你来特意地流出一点空白,来让面试官涂鸦,让面试官感觉你这人就很舒服,你这波就稳了。

当然即使面对着把控全场的面试官你也得主动出击,每个人都有自己的擅长点,你需要引导面试官来询问你的长处。

正确的回答姿势

正确的回答姿势是 BFS(广度优先搜索) 而不是 DFS (深度优先搜索),什么意思呢?

就是我们需要先从大局上讲出需要设计的东西的重点,然后再等待面试官的继续提问,深挖

我们需要揣摩面试官的心理,从他的提问可以看出他想要知道的重点是哪个方向的。

比如就拿 HashMap 来说,你简单的把获取、写入、冲突处理、扩容啥的都说了,然后等待面试官接下来的提问,有可能会往线程安全方面深入,也有可能会往扩容方向再挖,比如引出 Redis 的 hash 扩容等等。

所以说给面试官留提问的机会,抓住他的喜好或者说熟知的方向回答,这样如果你答得好,相互之间谈的来,面试官会对你高度认可。

而且在说各设计要点的时候也要注意停顿,要留机会给面试官插话,让面试官充分参与你的设计

还是拿 HashMap 作为例子,比如你说了获取、写入、冲突之后稍作停顿,这时候大概率面试官还会问还有吗?让面试官有参与感,让他感觉经过他的引导这个设计才逐步地完善

当然如果不问也没事,你停顿下继续说就行。

让面试成为一场技术交流,这是面试的最高境界,相信面试完了之后双方都会有意犹未尽的感觉,惺惺相惜就是这么来的。

但是这种场景也不是这么容易碰到的,首先你和面试官得有相同方向的喜好,比如你对 JVM 有很深入的研究,而面试官对存储方面有很深入的研究,JVM 懂的不深,这样就碰不出火花了。

所以说会有很多人碰到这么个情况:我面这个公司一面挂,另一家公司面面超神,这都是很正常的。

当然你要是说你全能,那当我没说。

小结一下面试技巧

首先要正确的看待面试官,你和面试官是同等的,不要一来就低声下气的。

其次回答问题需要抓住重点,不要一股脑儿的把你知道的都说了,要留白待面试官提问。

要把控面试的节奏,往自己熟知的方向上引。

如何写个消息中间件

接下来咱们再看看如何写个消息中间件。

首先我们需要明确地提出消息中间件的几个重要角色,分别是生产者、消费者、Broker、注册中心。

简述下消息中间件数据流转过程,无非就是生产者生成消息,发送至 Broker,Broker 可以暂缓消息,然后消费者再从 Broker 获取消息,用于消费。

而注册中心用于服务的发现包括:Broker 的发现、生产者的发现、消费者的发现,当然还包括下线,可以说服务的高可用离不开注册中心。

然后开始简述实现要点,可以同通信讲起:各模块的通信可以基于 Netty 然后自定义协议来实现,注册中心可以利用 zookeeper、consul、eureka、nacos 等等,也可以像 RocketMQ 自己实现简单的 namesrv (这一句话就都是关键词)。

为了考虑扩容和整体的性能,采用分布式的思想,像 Kafka 一样采取分区理念,一个 Topic 分为多个 partition,并且为保证数据可靠性,采取多副本存储,即 Leader 和 follower,根据性能和数据可靠的权衡提供异步和同步的刷盘存储。

并且利用选举算法保证 Leader 挂了之后 follower 可以顶上,保证消息队列的高可用。

也同样为了提高消息队列的可靠性利用本地文件系统来存储消息,并且采用顺序写的方式来提高性能。

可根据消息队列的特性利用内存映射、零拷贝进一步的提升性能,还可利用像 Kafka 这种批处理思想提高整体的吞吐。

至此就差不多了,该说的要点说的都差不多了,面试官心里已经想,这人好像有点东西。

之后可以深挖的点就很多了,比如提到的 Netty,各种注册中心就能问很多,比如各注册中心之间的选型对比等。

你还提到了选举算法,所以可能会问 Bully 算法、Raft 算法、ZAB 算法等等。

你还提到了分区,可能会问这个分区和 RocketMQ 的队列有什么不同啊?具体分区要怎么实现?

然后你提到顺序写,可能会问为什么要顺序写啊?你说的内存映射和零拷贝又是什么啊?那你知道 RocketMQ 和 Kafka 用了哪个吗?

当然还有可能问各种细节,比如消息的写入如何存储、消息的索引如何生成等等,来深挖看你有没有看过消息中间件的源码。

可以问的还很多,这篇文章我也不可能每个点都延伸开说,这些知识点还是得靠大家日积月累和平日的多加思考

当然日后的文章可以写一写今天提到的一些点,比如 Netty、选举算法啊,多种注册中心对比啊啥的。

面试官想问的是什么

再回到这个面试题,其实面试官想问的就是大方向上的设计,包括整体的架构、数据的流转和一些特性的把握,所以对于这个问题他想听到的就是那些重点,而不是那些细节。

而继续的深挖取决于你回答这个问题时提出的各个关键词,对于面试官自身而言熟悉的词一抓到,他就已经知道下一步要问你什么了。

所以在回答面试官的时候不仅要 get 到他的点,还得为之后的回答铺路,不会说的点不要提,擅长的点多提提。

最后

之前我已经提到了,这篇文章的重点其实不在于如何回答写一个消息中间件,而在于面试的技巧。

因为面试题千千万,而技巧掌握了那么千千万的面试题都适用。

我还想提一下关于面试的一些个人看法,我个人是面试驱动学习型选手,我学习的动力就是面试,我享受面试官问我啥我都嘴角一翘微微一笑的那种不羁。

但是我不提倡那种纯粹背面试题的做法,学习是一个日积月累的过程,就像我每篇文末说的,从一点点到亿点点,又像我每篇开头都会提的,每个时代,都不会亏待会学习的人。

我的面试驱动不仅仅是说为了面试而学习,还要以面试场景来学习,什么意思呢?

学任何一种东西,都模拟一个面试官在你前面,让他从各种角度向你提问,驱动你全方位的理解一个知识点,这才是我说的面试驱动学习型选手。

所以如果你看过我之前的文章会发现我经常会提出为什么呢,然后再作答。

还有一点要注意,动手能力,这很关键。

Talk is cheap, show me the code。

消息队列设计成推消息还是拉消息?RocketMQ和Kafka是怎么做的?

今天我们就来谈一谈消息队列的推拉模式,这也是一个面试热点,例如你在简历里面写了 RocketMQ ,基本上会问你 RocketMQ 采用的是推模式还是拉模式啊?是拉模式?不是有 PushConsumer 吗?

今天我们就来谈谈推拉模式,并且再来看看 RocketMQ 和 Kafka 是如何做的。

推拉模式

首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互

默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。

想象一下,如果需要 Broker 去拉取消息,那么 Producer 就必须在本地通过日志的形式保存消息来等待 Broker 的拉取,如果有很多生产者的话,那么消息的可靠性不仅仅靠 Broker 自身,还需要靠成百上千的 Producer。

Broker 还能靠多副本等机制来保证消息的存储可靠,而成百上千的 Producer 可靠性就有点难办了,所以默认的 Producer 都是推消息给 Broker。

所以说有些情况分布式好,而有些时候还是集中管理好。

推模式

推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。

我们来想一下推模式有什么好处?

消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。

对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。

推模式有什么缺点?

推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。

并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。

这其实就增加了 Broker 自身的复杂度。

所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。

拉模式

拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。

我们来想一下拉模式有什么好处?

拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。

拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。

拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。

拉模式有什么缺点?

消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。

消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

那到底是推还是拉

可以看到推模式和拉模式各有优缺点,到底该如何选择呢?

RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。

我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。

而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。

虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。

那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。

长轮询

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。

为了简单化,下面我把消息不满足本次拉取的条数啊、总大小啊等等都统一描述成还没有消息,反正都是不满足条件。

RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已

因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。

后台会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入阻塞队列 pullRequestQueue 中。然后又有个 PullMessageService 线程不断的从阻塞队列 pullRequestQueue 中获取 pullRequest,然后通过网络请求 broker,这样实现的准实时拉取消息。

这一部分代码我不截了,就是这么个事儿,稍后会用图来展示。

然后 Broker 的 PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回,如果没有消息怎么办呢?我们来看一下代码。

我们再来看下 suspendPullRequest 方法做了什么。

而 PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest请求,然后看看待拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,最终调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新尝试处理这个消息的请求,也就是再来一次,整个长轮询的时间默认 30 秒。

简单的说就是 5 秒会检查一次消息时候到了,如果到了则调用 processRequest 再处理一次。这好像不太实时啊? 5秒?

别急,还有个 ReputMessageService 线程,这个线程用来不断地从 commitLog 中解析数据并分发请求,构建出 ConsumeQueue 和 IndexFile 两种类型的数据,并且也会有唤醒请求的操作,来弥补每 5s 一次这么慢的延迟

代码我就不截了,就是消息写入并且会调用 pullRequestHoldService#notifyMessageArriving。

最后我再来画个图,描述一下整个流程。

Kafka 中的长轮询

像 Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回。

我们来简单的看一下源码,为了突出重点,我会删减一些代码。

先来看消费者端的代码。

上面那个 poll 接口想必大家都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。

我们再来看下最终 client.poll 调用的是什么。

最后调用的就是 Kafka 包装过的 selector,而最终会调用 Java nio 的 select(timeout)

现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的

Broker 处理所有请求的入口其实我在之前的文章介绍过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。

这个方法进来,我截取最重要的部分。

下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,大家放大图片看下即可。

这个炼狱名字取得很有趣,简单的说就是利用我之前文章提到的时间轮,来执行定时任务,例如这里是delayedFetchPurgatory,专门用来处理延迟拉取操作。

我们先简单想一下,这个延迟操作都需要实现哪些方法,首先构建的延迟操作需要有检查机制,来查看消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还需要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。

这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:

  • isCompleted 检查条件是否满足的方法
  • tryComplete 条件满足之后执行的方法
  • onComplete 执行完毕之后调用的方法
  • onExpiration 过期之后需要执行的方法

判断是否过期就是由时间轮来推动判断的,但是总不能等过期的时候再去看消息到了没吧?

这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延迟请求消息来了,具体代码我不贴了, 在 ReplicaManager#appendRecords 方法内部再深入个两方法可以看到。

不过虽说代码不贴,图还是要画一下的。

小结一下

可以看到 RocketMQ 和 Kafka 都是采用“长轮询”的机制,具体的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。

一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。

最后

总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更适合于消息队列。

看了这篇文章相信之后面试官问你推还是拉?建议给他个歪嘴笑。

消息队列之事务消息?RocketMQ和Kafka是怎么做的?

今天我们来谈一谈消息队列的事务消息,一说起事务相信大家都不陌生,脑海里蹦出来的就是 ACID。

通常我们理解的事务就是为了一些更新操作要么都成功,要么都失败,不会有中间状态的产生,而 ACID 是一个严格的事务实现的定义,不过在单体系统时候一般都不会严格的遵循 ACID 的约束来实现事务,更别说分布式系统了。

分布式系统往往只能妥协到最终一致性,保证数据最终的完整性和一致性,主要原因就是实力不允许…因为可用性为王。

而且要保证完全版的事务实现代价很大,你想想要维护这么多系统的数据,不允许有中间状态数据可以被读取,所有的操作必须不可分割,这意味着一个事务的执行是阻塞的,资源是被长时间锁定的。

在高并发情况下资源被长时间的占用,就是致命的伤害,举一个有味道的例子,如厕高峰期,好了懂得都懂。

对了, ACID 是什么还不太清楚的同学,赶紧去查一查,这里我就不展开说了。

分布式事务

那说到分布式事务,常见的有 2PC、TCC 和事务消息,这篇文章重点就是事务消息,不过 2PC 和 TCC 我稍微提一下。

2PC

2PC 就是二阶段提交,分别有协调者和参与者两个角色,二阶段分别是准备阶段和提交阶段。

准备阶段就是协调者向各参与者发送准备命令,这个阶段参与者除了事务的提交啥都做了,而提交阶段就是协调者看看各个参与者准备阶段都 o 不 ok,如果有 ok 那么就向各个参与者发送提交命令,如果有一个不 ok 那么就发送回滚命令。

这里的重点就是 2PC 只适用于数据库层面的事务,什么意思呢?就是你想在数据库里面写一条数据同时又要上传一张图片,这两个操作 2PC 无法保证两个操作满足事务的约束。

而且 2PC 是一种强一致性的分布式事务,它是同步阻塞的,即在接收到提交或回滚命令之前,所有参与者都是互相等待,特别是执行完准备阶段的时候,此时的资源都是锁定的状态,假如有一个参与者卡了很久,其他参与者都得等它,产生长时间资源锁定状态下的阻塞

总体而言效率低,并且存在单点故障问题,协调者是就是那个单点,并且在极端条件下存在数据不一致的风险,例如某个参与者未收到提交命令,此时宕机了,恢复之后数据是回滚的,而其他参与者其实都已经执行了提交事务的命令了。

TCC

TCC 能保证业务层面的事务,也就是说它不仅仅是数据库层面,上面的上传图片这种操作它也能做。

TCC 分为三个阶段 try - confirm - cancel,简单的说就是每个业务都需要有这三个方法,先都执行 try 方法,这一阶段不会做真正的业务操作,只是先占个坑,什么意思呢?比如打算加 10 个积分,那先在预添加字段加上这 10 积分,这个时候用户账上的积分其实是没有增加的。

然后如果都 try 成功了那么就执行 confirm 方法,大家都来做真正的业务操作,如果有一个 try 失败了那么大家都执行 cancel 操作,来撤回刚才的修改。

可以看到 TCC 其实对业务的耦合性很大,因为业务上需要做一定的改造才能完成这三个方法,这其实就是 TCC 的缺点,并且 confirm 和 cancel 操作要注意幂等,因为到执行这两步的时候没有退路,是务必要完成的,因此需要有重试机制,所以需要保证方法幂等。

事务消息

事务消息就是今天文章的主角了,它主要是适用于异步更新的场景,并且对数据实时性要求不高的地方

它的目的是为了解决消息生产者与消息消费者的数据一致性问题。

比如你点外卖,我们先选了炸鸡加入购物车,又选了瓶可乐,然后下单,付完款这个流程就结束了。

而购物车里面的数据就很适合用消息通知异步删除,因为一般而言我们下完单不会再去点开这个店家的菜单,而且就算点开了购物车里还有这些菜品也没有关系,影响不大。

我们希望的就是下单成功之后购物车的菜品最终会被删除,所以要点就是下单和发消息这两个步骤要么都成功要么都失败

RocketMQ 事务消息

我们先来看一下 RocketMQ 是如何实现事务消息的。

RocketMQ 的事务消息也可以被认为是一个两阶段提交,简单的说就是在事务开始的时候会先发送一个半消息给 Broker。

半消息的意思就是这个消息此时对 Consumer 是不可见的,而且也不是存在真正要发送的队列中,而是一个特殊队列。

发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息。

此时有人说这一步发送提交或者回滚消息失败了怎么办?

影响不大,Broker 会定时的向 Producer 来反查这个事务是否成功,具体的就是 Producer 需要暴露一个接口,通过这个接口 Broker 可以得知事务到底有没有执行成功,没成功就返回未知,因为有可能事务还在执行,会进行多次查询。

如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了。

我们再来简单的看下如何使用,我根据官网示例代码简化了下。

可以看到使用起来还是很简便直观的,无非就是多加个反查事务结果的方法,然后把本地事务执行的过程写在 TransationListener 里面。

至此 RocketMQ 事务消息大致的流程已经清晰了,我们画一张整体的流程图来过一遍,其实到第四步这个消息要么就是正常的消息,要么就是抛弃什么都不存在,此时这个事务消息已经结束它的生命周期了。

RocketMQ 事务消息源码分析

然后我们再从源码的角度来看看到底是怎么做的,首先我们看下sendMessageInTransaction 方法,方法有点长,不过没有关系结构还是很清晰的。

流程也就是我们上面分析的,将消息塞入一些属性,标明此时这个消息还是半消息,然后发送至 Broker,然后执行本地事务,然后将本地事务的执行状态发送给 Broker ,我们现在再来看下 Broker 到底是怎么处理这个消息的

在 Broker 的 SendMessageProcessor#sendMessage 中会处理这个半消息请求,因为今天主要分析的是事务消息,所以其他流程不做分析,我大致的说一下原理。

简单的说就是 sendMessage 中查到接受来的消息的属性里面MessageConst.PROPERTY_TRANSACTION_PREPARED 是 true ,那么可以得知这个消息是事务消息,然后再判断一下这条消息是否超过最大消费次数,是否要延迟,Broker 是否接受事务消息等操作后,将这条消息真正的 topic 和队列存入属性中,然后重置消息的 topic 为RMQ_SYS_TRANS_HALF_TOPIC,并且队列是 0 的队列中,使得消费者无法读取这个消息。

以上就是整体处理半消息的流程,我们来看一下源码。

就是来了波狸猫换太子,其实延时消息也是这么实现的,最终将换了皮的消息入盘。

Broker 处理提交或者回滚消息的处理方法是 EndTransactionProcessor#processRequest,我们来看一看它做了什么操作。

可以看到,如果是提交事务就是把皮再换回来写入真正的topic所属的队列中,供消费者消费,如果是回滚则是将半消息记录到一个 half_op 主题下,到时候后台服务扫描半消息的时候就依据其来判断这个消息已经处理过了。

那个后台服务就是 TransactionalMessageCheckService 服务,它会定时的扫描半消息队列,去请求反查接口看看事务成功了没,具体执行的就是TransactionalMessageServiceImpl#check 方法。

我大致说一下流程,这一步骤其实涉及到的代码很多,我就不贴代码了,有兴趣的同学自行了解。不过我相信用语言也是能说清楚的。

首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC下的所有队列,如果还记得上面内容的话,就知道半消息写入的队列是 id 是 0 的这个队列,然后取出这个队列对应的 half_op 主题下的队列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主题下的队列。

这个 half_op 主要是为了记录这个事务消息已经被处理过,也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在 half_op 中。

然后调用 fillOpRemoveMap 方法,从 half_op 取一批已经处理过的消息来去重,将那些没有记录在 half_op 里面的半消息调用 putBackHalfMsgQueue 又写入了 commitlog 中,然后发送事务反查请求,这个反查请求也是 oneWay,即不会等待响应。当然此时的半消息队列的消费 offset 也会推进。

然后producer中的 ClientRemotingProcessor#processRequest 会处理这个请求,会把任务扔到 TransactionMQProducer 的线程池中进行,最终会调用上面我们发消息时候定义的 checkLocalTransactionState 方法,然后将事务状态发送给 Broker,也是用 oneWay 的方式。

看到这里相信大家会有一些疑问,比如为什么要有个 half_op ,为什么半消息处理了还要再写入 commitlog 中别急听我一一道来。

首先 RocketMQ 的设计就是顺序追加写入,所以说不会更改已经入盘的消息,那事务消息又需要更新反查的次数,超过一定反查失败就判定事务回滚。

因此每一次要反查的时候就将以前的半消息再入盘一次,并且往前推进消费进度。而 half_op 又会记录每一次反查的结果,不论是提交还是回滚都会记录,因此下一次还循环到处理此半消息的时候,可以从 half_op 得知此事务已经结束了,因此就被过滤掉不需要处理了。

如果得到的反查的结果是 UNKNOW,那 half_op 中也不会记录此结果,因此还能再次反查,并且更新反查次数。

到现在整个流程已经清晰了,我再画个图总结一下 Broker 的事务处理流程。

Kafka 事务消息

Kafka 的事务消息和 RocketMQ 的事务消息又不一样了,RocketMQ 解决的是本地事务的执行和发消息这两个动作满足事务的约束。

而 Kafka 事务消息则是用在一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败,就像下面代码所演示的。

Kafka 的事务基本上是配合其幂等机制来实现 Exactly Once 语义的,所以说 Kafka 的事务消息不是我们想的那种事务消息,RocketMQ 的才是。

讲到这我就想扯一下了,说到这个 Exactly Once 其实不太清楚的同学很容易会误解。

我们知道消息可靠性有三种,分别是最多一次、恰好一次、最少一次,之前在消息队列连环问的文章我已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次。

消息恰好被消费一次当然我们所有人追求的,但是之前文章我已经从各方面已经分析过了,基本上难以达到。

而 Kafka 竟说它能实现 Exactly Once?这么牛啤吗?这其实是 Kafka 的一个噱头,你要说他错,他还真没错,你要说他对但是他实现的 Exactly Once 不是你心中想的那个 Exactly Once。

它的恰好一次只能存在一种场景,就是从 Kafka 作为消息源,然后做了一番操作之后,再写入 Kafka 中

那他是如何实现恰好一次的?就是通过幂等,和我们在业务上实现的一样通过一个唯一 Id, 然后记录下来,如果已经记录过了就不写入,这样来保证恰好一次。

所以说 Kafka 实现的是在特定场景下的恰好一次,不是我们所想的利用 Kafka 来发送消息,那么这条消息只会恰巧被消费一次

这其实和 Redis 说他实现事务了一样,也不是我们心想的事务。

所以开源软件说啥啥特性开发出来了,我们一味的相信,因此其往往都是残血的或者在特殊的场景下才能满足,不要被误导了,不能相信表面上的描述,还得详细的看看文档或者源码。

不过从另一个角度看也无可厚非,作为一个开源软件肯定是想更多的人用,我也没说谎呀,我文档上写的很清楚的,这标题也没骗人吧?

确实,比如你点进震惊xxxx标题的文章,人家也没骗你啥,他自己确实震惊的呢。

再回来谈 Kafka 的事务消息,所以说这个事务消息不是我们想要的那个事务消息,其实不是今天的主题了,不过我还是简单的说一下。

Kafka 的事务有事务协调者角色,事务协调者其实就是 Broker 的一部分。

在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息

然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。

最后协调者会向事务日志中再记一条事务结束信息,至此 Kafka 事务就完成了,我拿 confluent.io 上的图来总结一下这个流程。

最后

至此我们已经知道了 RocketMQ 和 Kakfa 的事务消息全流程,可以看到 RocketMQ 的事务消息才是我们想要的,当然你要是用的流式计算那么 Kakfa 的事务消息也是你想要的。

比 RocketMQ 更好的事务消息实现是什么?

先抛出的一个问题:一个事务涉及 mysql 和 mq

以上是关于七万字,151张图,通宵整理消息队列核心知识点总结!这次彻底掌握MQ!的主要内容,如果未能解决你的问题,请参考以下文章

5万字97 张图总结操作系统核心知识点

干货总结!Kafka 面试大全(万字长文,37 张图,28 个知识点)

MQ4万字保姆教程|RabbitMQ知识点整理与Springboot整合附Demo(图文并茂)

超详细万字长文,我画了近百张图来理解红黑树

万字总结 MySQL核心知识,赠送25连环炮

七万字❤️零基础学算法 LeetCode 热题 HOT 100(附题解,强烈建议收藏)