万亿级消息背后: 小米消息队列的实践

Posted InfoQ

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了万亿级消息背后: 小米消息队列的实践相关的知识,希望对你有一定的参考价值。

作者丨勇幸

策划丨田晓旭

小米内部在 2015 年之前使用的是 Kafka 0.8 版本,当时的痛点比较多,由于 Kafka 本身是存储计算耦合的架构,使得数据不均衡的问题经常凸显,集群扩容、故障恢复也变得异常麻烦,给运维工作带来不少痛苦;同时,由于 Consumer 的 Rebalance 算法每次都是全部重新计算,使得业务的消费体验也不是很好;

作者批注:存储计算耦合的架构在扩容和故障转移时都需要进行数据搬迁;故障恢复时一般要经历复杂的算法先选举 Leader,且提供服务前要先保证各副本数据是一致的。

万亿级消息背后: 小米消息队列的实践

我们期望能有一个快速扩容、无状态无感知的消息队列,同时具有很好的容错机制,故障时可以快速恢复,减少运维的复杂度;消费端希望发生变化时能有一个最小损失的算法,让消费尽量平滑;同时这个系统需要高度定制化,以满足小米内部业务和生态链公司的需求,比如多租户相关的特性,跨机房的 Replication 机制等;

作者批注:当时开源的消息队列在多租户方面都不是很完备,更多倾向在数据管道或解耦 RPC 调用的特性上。

在这样的背景下,有了小米自研的分布式消息队列:Talos,它立项的目标是对内满足小米各部门的业务需求,对外为生态链公司输出中间件能力,系统本身对标 AWS Kinesis 与 Apache Kafka。

万亿级消息背后: 小米消息队列的实践

我们来看一下 Talos 目前在小米系统架构中的角色,主要有两种:在线的消息队列与数据集成平台的总线;

万亿级消息背后: 小米消息队列的实践

在线消息队列主要是直接使用 Talos 的 SDK — Producer 和 Consumer 来生产、消费消息;

数据集成平台总线如图,左侧是各种数据源,Talos 平台为用户提供了连接各种数据源的 Source Connector,比如 Web Source、Agent Source、BinLog Source 等,这些组件可以帮助业务将埋点数据、日志文件、BinLog 日志等导入到 Talos;平台会根据用户在产品界面配置的 Sink 需求自动将数据 transfer 到下游的各种存储 / 检索系统中,比如 HDFS、Kudu、ElasticSearch 等,然后用户可以使用各种计算引擎访问 / 查询这些数据,辅助 BI 决策;

架构与关键问题

Talos 是一个存储计算分离的架构,底层基于 HDFS 来存储实际的 Message,使用 HDFS 也是得益于小米在存储方面多年的深耕和积累;上层 Talos Server 是无状态无中心的计算节点,使用一致性哈希作为 Partition 调度和负载均衡的策略;

万亿级消息背后: 小米消息队列的实践

当 Producer/Consumer 读写时,Talos Server 收到请求后首先会根据一致性哈希计算所请求的 Topic-Partition 应该由哪一台机器 Serve,如果是由自己 Serve,则直接向 HDFS 发起请求;如果是由其他机器 Serve,则会先做一次转发。Talos 系统的一些 Meta 信息存储在 ZooKeeper 中,除此之外,一些控制流信息也是通过 ZK 来完成,比如 Talos Server 的上下线,Topic DDL 相关的操作广播等。

万亿级消息背后: 小米消息队列的实践

作为存储计算分离的架构,计算层 TalosServer 负责 Topic 的逻辑分区 Partition 的调度和管理,存储层 HDFS 负责具体物理分片的存储和高可用,这样的分层是有不少优点的:

1)计算层由于无状态,管理调度都非常简单,天然支持水平扩展,扩容迅速;故障转移也更加快速简单,不需要关心数据的同步和一致性,专注于计算逻辑;

2)存储层专注数据管理的高可靠和高可用,使用现有比较成熟的存储系统,可以间接降低系统复杂度;

作者批注:相比计算框架的快速更新换代,存储系统这些年相对比较成熟,迭代一般是性能方面的提升;依赖 HDFS,可以让 Talos 在数据存储方面走“捷径”。

在这样的架构设计下,实现 Talos 系统有几个问题是比较关键的,这里列举三个说明:

万亿级消息背后: 小米消息队列的实践

首先是 DFS Client 的 Tailing Read,我们知道 HDFS 正在写入的 Block 数据对用户是不可见的,但是消息队列的使用场景大部分都是边写边读的,为了能够支撑 Talos 的场景,小米内部对 HDFS Client 做了改造,使其支持最后一个 Block 的 Tailing Read;

其次是 Talos 的一致性模型,即必须保障对于某个 Partition,同一时刻只有一个 TalosServer 写入,否则就会出现脑裂;Talos 规避脑裂的机制主要包括两方面:1)利用 HDFS RecoverLease 保证 Partition 发生迁移时最后一个文件不会再被写入;2)设计特定的 Fencing 机制来保证 Partition 对应的文件目录的操作原子性;通过这两方面来保证 Talos 写操作的一致性,详细细节可以关注后续文章《Talos 一致性模型》。

作者批注:这里的 Partition 在物理上实际对应的是 HDFS 的一个目录。

第三个问题是分区延迟分配,由于 Talos 采用一致性哈希进行 Partition 调度,当集群滚动升级或某个节点重启时,就会带来 Partition 的迁出以及重新迁入,这种开销是完全可以避免的,尤其当集群比较大时,这种频繁的迁移会导致很差的消费体验;Talos 使用一种 Partition 延迟分配的策略来规避这个问题,减少因为频繁迁移带来的通信开销和网络开销,保证消费的平滑。

性能与资源优化

万亿级消息背后: 小米消息队列的实践

随着业务规模的爆发式增长,Talos 在性能和资源方面也遇到一些瓶颈与挑战,目前 Talos 的规模大致如上:

  • 日处理消息数超过 2 万亿条,日消息峰值 4 千万条 / 秒,日处理数据量 1.3PB;

  • Topic 总数 13000+,下游的作业数 15000+,接入业务数量 350+ ;

万亿级消息背后: 小米消息队列的实践

面对这样规模的背景下,Talos 在性能和资源方面的优化主要包括以上几个方面,接下来我们针对各方面一一展开。

首先是线程模型的改造。Talos 开始使用的读写线程模型就是一个简单的线程池,当一个请求到来时哈希到不同的线程上处理,这样做的问题是,当有一个 Topic 的请求卡主或变慢时,会影响在这个线程上排队的其他 Topic 的请求;我们希望既能尽量避免不同 Topic 请求间的相互影响,又能充分利用空闲的线程,于是做了一个“具有记忆功能的最小堆”线程池,如下图右边所示:

万亿级消息背后: 小米消息队列的实践

这个线程池包含两部分,一部分是记录当前正在线程上排队的请求个数的线程最小堆,一部分是记录当前正在 runtime 的 Topic 请求对应的处理线程;假设 T1_P1 表示 Topic1-Partition1 的请求,当有新的 T1_P1 请求到来时,首先看记录表中是否有线程正在处理 T1_P1 的请求,如果有则使用相同的线程,如果没有则从最小堆中取一个最空闲的线程来处理;这样既能保证当有请求变慢或卡住时,尽量不会影响其他的请求,也能最大化利用空闲的线程。

万亿级消息背后: 小米消息队列的实践

由于 Talos 存在转发的情况,转发线程也存在互相干扰的问题,例如同一个节点转发到其他不同节点间可能互相影响,转发读请求和写请求之间也会互相影响,这一部分我们也做了改造;最后我们总结了线程模型改造的经验:核心原则是避免竞争,具体展开可以分为三个方面:1)不同处理对象之间,例如不同的 Topic-Partition 请求;2)不同的操作之间,例如读与写;3)一个流程有多个步骤,将耗时步骤分解出来使用单独的线程处理;

万亿级消息背后: 小米消息队列的实践

第二部分是写优化,简单总结就是将多次小 I/O 合并成大 I/O,提升系统响应和吞吐。Talos 为了实现数据不丢的语义,对于客户端的每一次写请求,服务端都会执行一次 HDFS flush,如上图,对于 Partition1 的 3 次请求,会调用 3 次 flush;这种情况在流量突增或高 QPS 场景下会比较吃力;

万亿级消息背后: 小米消息队列的实践

传统的生产者消费者模型每次都是从队列中取出一个 Task 消费,我们在现有 Task 的基础上抽象出一个虚拟的 Task,当某个 Partition 的请求被处理时,它会拿到当前这个时刻到达 Server 端该 Partition 所有的请求,并将其合并成一个大的 I/O,进行一次整体 flush,如果成功则拆开多次返回给客户端成功,如果失败则拆开多次分别返回给客户端异常信息;这样的优化后单机 QPS 从 1K 提升到 1W+,在同样 QPS 的压力下,延迟也有很大提升,例如在单机 5000 QPS 的情况下,P99 延迟从 70ms 降低到 5ms。

万亿级消息背后: 小米消息队列的实践

一般来讲,绝大部分系统应该不会遇到上下文切换带来的烦恼,但是在高并发场景下,线程数设置的过多会大大降低系统性能;除了设置过多的线程,竞争锁也会带来上下文切换,持锁时间、持锁粒度,wait-notify 等使用不合理都会带来上下文切换的负担;此外 GC 也是带来上下文切换的重要因素;为了避免上下文切换影响系统性能,可以将这个指标放在服务性能监控的展板中,随时观测防患于未然;

Talos 在 GC 问题上经历了三个阶段,最早是频繁 CMS 带来的延迟影响,对内存参数调优后解决这个问题,但是每过一段时间,大概 20-30 天的周期,就会进行一次 Full GC,对延迟敏感的线上业务就会受到影响;Talos 在将 GC 算法改为 G1 时进行了适当的调优,调优使用的工具是 HotSpot 开发的 gc 日志工具,这其中最重要的是要确定服务的常驻内存,依此设置好 IHOP,然后调整 Young GC 的耗时以及触发 Mixed GC 的时间间隔等;

万亿级消息背后: 小米消息队列的实践

如上图是调优前后的 GC 耗时对比,经过针对性调优,GC 耗时由之前的大部分高于 100ms 变为 70ms 以内;

万亿级消息背后: 小米消息队列的实践

对于高吞吐型的服务,带宽资源往往会成为瓶颈,针对带宽 Talos 主要做了两方面工作:客户端寻址与基于流量的个性化一致性哈希;通过计算我们发现基于转发模型的 Talos,有 95% 以上的请求都需要被转发,这会带来很大的带宽消耗,客户端寻址让带宽资源节省了 40%,同时由于省略了转发,P95 的延迟也优化了 50%。

万亿级消息背后: 小米消息队列的实践

通过一致性哈希来负载均衡,Talos 每个节点 Serve 的 Partition 个数基本上是均匀的,但由于业务数据的多样性,Partition 之间的流量差别是很大的,这就会带来 I/O 的不均衡,从而影响系统性能;如上图,左侧 3 个节点都有 5 个 Partition,但是节点 2 与 节点 3 之间的流量差值却有一倍多,我们希望通过一些策略,可以达到右侧的效果,让节点间的流量趋于均衡;

Talos 对一致性哈希做出了改造,通过引入流量因子,对一致性哈希中每个物理节点智能调整其虚拟节点的个数,从而影响物理节点 Serve 的 Partition 个数,来达到流量的均衡;具体来讲:

1)Talos 会获取每个物理节点历史的日均流量,根据集群节点个数,计算出每个节点平均的日均流量期望值

2)以期望值为标准,大于期望值的节点会被减少虚拟节点数量,反之会被增加虚拟节点

3)虚拟节点变化后会重新计算 Partition 的分布,然后根据 Partition 流量算出各个节点新的流量分布

4)重复多轮调整直到流量分布达到预期阈值

万亿级消息背后: 小米消息队列的实践

如上是 Talos 一个集群负载均衡优化前后的对比图,横坐标是节点机器 ID,纵坐标是节点的日均流量,优化后机器间日均流量的差值缩小了 50%+,具体细节可以关注后续文章《Talos 负载均衡实践》;

作者批注:一致性哈希的个性化改造可以广泛应用到各种指标,比如流量、QPS 等;相关的专利正在申请中。

平台化效率

Talos 在平台化的演进中,监控和计量是不可忽视的一部分,为了能满足高效、高吞吐、统一的监控与计量需求,我们抽象出一套通用的框架服务于各模块,主要包含三部分:指标收集、指标整理以及指标展示;

万亿级消息背后: 小米消息队列的实践

各模块及 Talos 服务本身的进程会将需要的指标打到本机部署的 Agent,Agent 会将指标转到 Talos 特定的 Topic 中,后端会有高可用的流式作业将数据实时导入 Druid,必要时做一些指标整理,然后基于 Druid 将重要的指标展示到 Dashboard 与 Falcon 中,供监控和报警;其中一些机器级别等不需要聚合的指标也会直接推送到 Falcon 用于报警使用;

基于以上的流程,Talos 提供了一套多维度(服务维度 / 集群维度 / 机器维度)、多视角(用户视角 / 开发视角 / 运维视角)的指标监控与运营平台;同时 Talos 服务的计量计费也是基于这套框架来对各部门业务的数据进行实时计量与计费;

万亿级消息背后: 小米消息队列的实践

作为平台,不可避免的问题是资源管理,对于 Talos 来说,就是 Partition 资源配额的申请与审批;之前是人工管理,耗费了大量的精力和时间,我们对业务的真实流量和 QPS 做了统计,模拟出真实增长需求的阈值,然后对比资源配额的申请发现,有 70% 的需求都是在这个阈值以下,可以直接审批通过的;有 28% 的单子是用户夸大了需求,会造成资源浪费;超过阈值且真实的需求很少,只有这部分才需要人为干预,去评估业务这种突然很大配额需求;

我们把夸大需求的各种 Case 抽象成异常,让平台自动拒绝申请并提示拒绝原因和存在的问题;对于真实需求的情况自动审批,实现资源管理的自动化和申请自助化,节省了绝大部分的人工支持工作;

作者批注:业务再也不用担心 / 催促资源申请无人审批的问题了。

小米消息中间件的规划与愿景

万亿级消息背后: 小米消息队列的实践

最后谈一下小米中间件的规划与愿景,大致分为两个部分:

持续贴紧业务,价值输出

首先,我们需要做好重要特性赋能关键业务。例如 Transaction 落地电商金融等场景,Replication 落地云服务等场景。

其次,小米在电商、金融的发展越来越迅速,作为企业级消息系统,需要为此做好准备。我们希望未来能够打造金融级的分布式消息系统,为电商金融等场景提供靠谱的中间件能力;为此,业务对系统的高可靠、一致性、高可用、性能等方面都会提出更高的要求和挑战,比如高可用会需要真正的同城多活、异地多活等能力;

持续学习业界,建设小米中间件技术能力

消息中间件近年的发展开始慢慢引入算力,通过轻量化计算,使得消息系统从传输数据向理解数据发展,我们希望借此将小米的消息中间件打造成智能传输 + 计算的服务平台,为业务提供更好的平台化服务;

作为云原生时代架构的重要组成部分,消息中间件也需要持续跟进业界,进行 ServiceMesh 与 Serverless 架构的探索和升级;实际运维过程中,我们发现传统富客户端确实会带来各种各样的耦合问题,我们希望探索 Message Mesh 这种下一代消息传输架构,提升业务效率。

点个在看少个 bug

以上是关于万亿级消息背后: 小米消息队列的实践的主要内容,如果未能解决你的问题,请参考以下文章

基于消息队列 RocketMQ 的大型分布式应用上云最佳实践

基于消息队列 RocketMQ 的大型分布式应用上云最佳实践

深度 | 金融级消息队列的演进 — 蚂蚁金服的实践之路

干货 | 金融级消息队列的演进 — 蚂蚁金服的实践之路

滴滴出行千亿级消息队列炼成记!

网易云音乐:基于RocketMQ的亿级分布式消息队列系统建设实践