腾讯看点基于 Flink 构建万亿数据量下的实时数仓及实时查询系统

Posted 过往记忆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了腾讯看点基于 Flink 构建万亿数据量下的实时数仓及实时查询系统相关的知识,希望对你有一定的参考价值。

一、背景介绍

1. 需要解决的业务痛点

推荐系统

对于推荐同学来说,想知道一个推荐策略在不同人群中的推荐效果是怎么样的。

运营

对于运营的同学来说,想知道在广东省的用户中,最火的广东地域内容是哪些?方便做地域 push。

审核

对于审核的同学,想知道过去 5 分钟游戏类被举报最多的内容和账号是哪些,方便能够及时处理。

内容创作

对于内容的作者,想知道今天到目前为止,内容被多少个用户观看,收到了多少个点赞和转发,方便能够及时调整他的策略。

老板决策

对于老板来说,想知道过去 10 分钟有多少用户消费了内容,对消费人群有一个宏观的了解。

以上这几点都是我们日常工作中经常遇到的业务场景,后面的篇幅中会给出对应的解决方案。

2. 开发前调研

在进行开发之前我们做了如下这些调研。

■ 2.1 离线数据分析平台能否满足这些需求


调研的结论是不能满足离线数据分析平台,不行的原因如下:

  • 首先用户的消费行为数据上报需要经过 Spark 的多层离线计算,最终结果出库到 mysql 或者 ES 提供给离线分析平台查询。这个过程的延时至少是 3-6 个小时,目前比较常见的都是提供隔天的查询,所以很多实时性要求高的业务场景都不能满足。

  • 另一个问题是腾讯看点的数据量太大,带来的不稳定性也比较大,经常会有预料不到的延迟,所以离线分析平台是无法满足这些需求的。

■ 2.2 准实时数据分析平台


在腾讯内部提供了准实时数据查询的功能,底层技术用的是 Kudu + Impala,Impala 虽然是 MPP 架构的大数据计算引擎,并且访问以列式存储数据的 Kudu。但是对于实时数据的分析场景来说,它的查询响应速度和数据的延迟都还是比较高的。比如说查询一次实时的 DAU 返回结果的耗时至少是几分钟,无法提供良好的交互式的用户体验。

所以 Kudu+Impala 这种通用的大数据处理框架的速度优势,更多的是相比 Spark 加 HDFS 这种离线分析框架来说的,对于我们实时性要求更高的场景是无法满足的。因此需要进行开发,这就涉及到了方案选型和架构设计。

3. 腾讯看点信息流的业务流程

在大家介绍一下腾讯看点信息流的业务流程,了解了业务的流程,就能够更好的理解技术架构的方案。

  • 第 1 步,内容创作者发布内容;

  • 第 2 步,内容会经过内容审核系统启用或者下架;

  • 第 3 步,启用的内容给到推荐系统和运营系统,分发给 C 侧用户;

  • 第 4 步,内容分发给 C 侧用户之后,用户会产生各种行为,比如说曝光、点击举报等,这些行为数据通过埋点上报,实时接入到消息队列中;

  • 第 5 步,构建实时数据仓库;

  • 第 6 步,构建实时数据查询系统。

我们做的工作主要就在第 5 步和第 6 步,可以看一下我们的业务流程图来进一步的了解。

在业务流程图中,我们主要做的两部分工作,就是图中有颜色的这两部分:

  • 橙色部分,我们构建了一个腾讯看点的实时数据仓库;

  • 绿色部分,我们基于了 OLAP 的存储计算引擎,开发了实时数据分析系统。

为什么要构建实时数据仓库?因为原始的数据上报数据量非常大,一天上报的峰值就有上万亿条,而且上报的格式非常混乱,缺乏了内容的维度、信息用户的画像信息,下游就根本没有办法直接使用。

而我们提供的实时数据仓库,是根据腾讯看点信息流的业务场景,进行了内容维度的关联,用户画像的关联和各种粒度的聚合,下游可以非常方便的使用实时数据,而且实时数据仓库可以提供给下游的用户反复的消费使用,可以大量的减少重复的工作。

绿色部分的多维实时数据分析系统,消费了我们提供的实时数据仓库,利用了 OLAP 存储计算引擎,将海量的数据进行高效的存储,再提供高性能的多维实时分析功能。

二、架构设计

1. 设计的目标与难点

首先来看一下数据分析系统的设计目标与难点。我们的实时数据分析系统分为四大模块:

  • 实时计算引擎;

  • 实时存储引擎;

  • 后台服务层;

  • 前端展示层。

难点主要在于前两个模块,实时计算引擎和实时存储引擎。

  • 千万级每秒的海量数据如何实时的接入,并且进行极低延迟的维表关联是有难度的;

  • 实时存储引擎如何支持高并发的写入。高可用分布式和高性能的索引查询是比较难的,可以看一下我们的系统架构设计来了解这几个模块的具体实现。

2. 系统架构设计

关于系统架构的设计,主要从以下几方面来讲。

■ 2.1 实时计算


  • 接入层主要是从千万级每秒的原始消息队列中拆分出不同业务不同行为数据的微队列。拿 QQ 看点的视频内容来说,拆分过后的数据就只有百万级每秒了。

  • 实时计算层主要是负责多行行为流水数据进行 "行转列" 的操作,实时关联用户画像数据和内容维度数据。

  • 实时数仓存储层主要就是设计出符合看点的业务,下游好用的实时消息队列。

我们暂时提供了两个消息队列,作为实时数仓的两层:

  • 第一层是 DWM 层,它是内容 ID 和用户 ID 粒度聚合的,就是说一条数据包含了内容 ID 和用户 ID,然后还有 B 侧的内容维度数据,C 侧的用户行为数据,还有用户画像数据。

  • 第二层是 DWS 层,这一层是内容 ID 粒度聚合的,就是一条数据包含了内容 ID、B 侧数据和 C 侧数据。可以看到内容 ID 和用户 ID 粒度的消息,队列流量进一步减小到了 10 万级每秒,内容 ID 粒度更是减小到了万级每秒,并且格式更加清晰,维度信息更加丰富。

■ 2.2 实时存储

  • 实时写入层主要是负责 Hash 路由,将数据写入;

  • OLAP 存储层是利用 MPP 的存储引擎,设计出符合业务的索引和物化视图,高效存储海量数据;

  • 后台接口层是提供了高效的多维实时查询接口。

■ 2.3 后台服务

后台服务是基于腾讯自研的 RPC 后台服务框架写的,并且会进行一些二级缓存。

■ 2.4 前端服务

前端采用的是开源组件 Ant Design,利用了 nginx,反向代理了浏览器的请求到后台服务器上。

3. 方案选型

关于架构设计的方案选型,我们对比了业内的领先方案,最终选择了最符合我们业务场景的方案。

■ 3.1 实时数仓的选型

我们选择的是业内比较成熟的 Lambda 架构,它的优点是成熟度高,灵活性高,迁移成本低等等。但是它有一个缺点,实时和离线用了两套代码,可能会存在一个口径修改了数据,但另一个没有修改从而造成数据不一致的问题。我们的解决方案每天都有做数据对账的工作,如果有异常会进行告警。

■ 3.2 实时计算引擎的选型

我们选择了 Flink 作为实时计算引擎,是因为 Flink 在设计之初就是为了流处理来设计的,Sparks Streaming 严格来说还是微批处理,storm 现在用的已经不是很多了。并且, Flink 还有 exactly-once 的准确性,轻量级的容错机制,低延迟高吞吐,应用性高的特点,所以我们选择了 Flink 作为实时计算引擎。

■ 3.3 实时存储引擎

我们的要求是需要有维度索引,支持高并发的写入和高性能的多维实时 OLAP 查询。可以看到 HBase,TiDB 和 ES 都不能满足要求。Druid 有一个缺陷,它是按照时序划分 Segment,也就说明无法将同一个内容全部存放在同一个 Segment 上,所以在计算全局的 Top N 的时候就只能够计算近似值。于是我们选择了最近两年大火的 MPP 数据库引擎 Clickhouse,后面我会结合我们的具体使用场景和 Clickhouse 的内核原理,介绍一下 Clickhouse 的优势。

三、实时数仓

实时数仓也分为三块来介绍:

  • 第一是如何构建实时数仓;

  • 第二是实时数仓的优点;

  • 第三是基于实时数仓,利用 Flink 开发实时应用时候遇到的一些问题。

实时数仓这一部分的难度在于它处于一个比较新的领域,并且各个公司各个业务的差距都比较大,怎么样能够设计出方便好用,符合看点信息流业务场景的实时数仓是有难度的。

1. 如何构建实时数仓

先看一下实时数仓要做什么。实时数仓对外来说就是几个消息队列,不同的消息队列里面存放的是不同聚合粒度的实时数据,包括了内容 ID、用户 ID、C 侧用户行为数据,B 侧内容维度数据和用户画像数据等。搭建实时数仓可以分为三步。

■ 1.1数据清洗

首先从海量的原始消息队列中进行复杂的数据清洗操作,可以获得格式清晰的实时数据。它的具体操作其实就是在 Flink 的实时计算环节,先按照一分钟的粒度进行了窗口的聚合,在窗口内原本多行的行为数据被转成了一行多列的数据格式。

■ 1.2 高性能维表关联

第二步是进行高性能的实时维表关联,补充用户画像数据和内容维度数据等。但是海量的用户画像数据是存在于 HDFS 上的,内容维度数据又是存在于 HBase 上的,所以想要极低延迟的维表关联是有技术挑战的。这一块在后文会单独介绍。

■ 1.3 不同粒度聚

第三步是将算好的实时数据按照不同的粒度进行聚合,然后放到对应的消息队列中进行保存,可以提供给下游多用户复用,到这里实时数仓就搭建完成了。

接下来详细介绍一下第二步中高性能实时维表关联是怎么处理的。

几十亿的用户画像数据存放在 HDFS 上,肯定是无法进行高性能的维表关联的,所以需要进行缓存。由于数据量太大,本地缓存的代价不合理,我们采用的是 Redis 进行缓存,具体实现是通过 Spark 批量读取 HDFS 上的画像数据,每天更新 Redis 缓存,内容维度数据存放在 HBase 中。

为了不影响线上的业务,我们访问的是 HBase 的备库,而且由于内容维度变化的频率远高于用户画像,所以维度关联的时候,我们需要尽量的关联到实时的 HBase 数据。

一分钟窗口的数据,如果直接关联 HBase 的话,耗时是十几分钟,这样会导致任务延迟。我们发现 1000 条数据访问 HBase 是秒级的,而访问 Redis 的话只是毫秒级的,访问 Redis 的速度基本上是访问 HBase 的 1000 倍,所以我们在访问 HBase 的内容之前设置了一层 Redis 缓存,然后通过了监听 HBase-proxy 写流水,通过这样来保证缓存的一致性。

这样一分钟的窗口数据,原本关联内容维度数据耗时需要十几分钟,现在就变成了秒级。我们为了防止过期的数据浪费缓存,缓存的过期时间我们设置成了 24 个小时。

最后还有一些小的优化,比如说内容数据上报过程中会上报不少非常规的内容 ID,这些内容 ID 在 HBase 中是不存储的,会造成缓存穿透的问题。所以在实时计算的时候,我们直接过滤掉这些内容 ID,防止缓存穿透,又减少了一些时间。另外,因为设置了定时缓存,会引入一个缓存雪崩的问题,所以我们在实时计算的过程中进行了削峰填谷的操作,错开了设置缓存的时间,来缓解缓存雪崩的问题。

2. 实时数仓的优点

我们可以看一下,在我们建设实时数仓的前后,开发一个实时应用的区别。

没有数仓的时候,我们需要消费千万级每秒的原始队列,进行复杂的数据清洗,然后再进行用户画像关联、内容维度关联,才能够拿到符合要求格式的实时数据。开发和扩展的成本都会比较高。如果想开发一个新的应用,又要走一遍流程。现在有了实时数仓之后,如果再想开发一个内容 ID 粒度的实时应用,就直接申请 TPS 万级每秒的 DWS 层消息对列即可,开发成本变低很多,资源消耗小了很多,可扩展性也强了很多。

我们看一个实际的例子,开发我们系统的实时数据大屏,原本需要进行如上的所有操作才能够拿到数据,现在只需要消费 DWS 层消息队列写一条 Flink SQL 即可,仅仅会消耗 2 个 CPU 核心和 1GB 的内存。以 50 个消费者为例,建立实时数仓的前后,下游开发一个实时应用,可以减少 98% 的资源消耗,包括了计算资源、存储资源、人力成本和开发人员的学习接入成本等等,并且随着消费者越多节省的就越多,就拿 Redis 存储这一部分来说,一个月就能够省下上百万的人民币。

3. Flink 开发过程中遇到的问题总结

在利用 Flink 开发实时应用的过程中遇到过不少问题,这里选择几个比较有代表性的给大家分享一下。

■ 3.1 实时数据大屏

第一个是开发实时数据大屏的时候,开始是通过 Flink SQL 来实现的,功能非常简单,就是计算当天截止到当前累计的点击数,实现的方式也非常简单,输入的 source table 是实时数据仓库的消息队列。输出的 sink table 就是 Redis。SQL 就是:select sum(click) from sourceTable Group by day time。

这个任务看起来是没有问题的,但是实际跑起来数据却无法实时更新,是因为 source table 每到达一条点击数据,累计值都会加一,然后就会往 Redis 中写一条最新的数据。所以当数据量太大的时候,它就会频繁的写 Redis,所以这样就会导致写 Redis 的网络延迟会显得非常高,从而会导致背压数据无法实时更新。

我们做了一个简单的优化,用 table API 执行完 SQL 之后,转化成 DataStream,然后通过一个一秒钟的数据窗口,每秒钟仅仅会输出最新的累计值到 Redis 中,这样的数据就可以实时更新了。

■ 3.2 Flink state 的 TTL

Flink 的 1.6 版本开始引入了 state TTL,开启了 state TTL 之后,Flink 就会为每一个 keyed state 增加一个时间戳字段,通过时间戳字段就可以判断 state 是不是过期,是否需要进行清理。但是如果仅仅从字面意思上理解就会遇到一些问题,在 1.10 版本之前,虽然开启了 state TTL,但是 Flink 默认是不会自动清理过期的 state 的。所以如果是 heap memory backend,就会导致 OOM 的问题;如果是 rocksDB backend,就会导致 state 的状态越来越大,最终会导致重启的时候耗费的时间过长。后面经过调研,我们发现有两种方式可以清理 Flink 的过期的 state。

第一种是手动清理,第二种的话是自动清理。我们最终选择的是以手动触发的方式来清理过期的 state。每天在深夜,也就是业务低谷期的时候,我们会对 state 中的数据进行遍历的访问,访问到过期的数据,就会进行清理。

为什么我们没有选择 Flink 的自动清理策略,是因为 Flink 在 1.8 版本之前,只有一种自动清理策略,clean up in full snapshot。这种清理策略从名字上来看就知道他是在做全量 snapshot 的时候会进行清理,但是有一个致命的缺陷,它并不会减少本身 state 的大小,而是仅仅把清理过后的 state 做到 snapshot 里面,最终还是会 OOM。并且,它重启之后才能够加载到之前清理过的 state,会导致它频繁的重启。

虽然在 1.8 版本之后,增加了两种自动清理的策略,但是因为它是异步清理,所以他的清理时机和使用方式都不如手动清理那么灵活,所以最终我们还是选择了手动触发的方式进行清理。在 1.10 版本之后,默认是选择了自动清理的策略,但是这就要求用户对自动清理策略的时机和策略 有比较好的了解,这样才能够更好的满足业务的需求。

■ 3.3 使用 Flink valueState 和 mapState 经验总结


虽然通过 valueState 也可以存储 map 结构的数据,但是能够使用 mapState 的地方尽量使用 mapState,最好不要通过 valueState 来存储 map 结构的数据,因为 Flink 对 mapState 是进行了优化的,效率会比 valuState 中存储 map 结构的数据更加高效。

比如我们遇到过的一个问题就是使用 valueState 存储了 map 结构的数据,选择的是 rocksDB backend。我们发现磁盘的 IO 变得越来越高,延迟也相应的增加。后面发现是因为 valueState 中修改 map 中的任意一个 key 都会把整个 map 的数据给读出来,然后再写回去,这样会导致 IO 过高。但是 mapState,它每一个 key 在 rocksDB 中都是一条单独的 key,磁盘 IO 的代价就会小很多。

■ 3.4 Checkpoint 超时问题


我们还遇到过一些问题,比如说 Checkpoint 超时了,当时我们第一个想法就是计算资源不足,并行度不够导致的超时,所以我们直接增加了计算资源,增大了并行度,但是超时的情况并没有得到缓解。后面经过研究才发现是数据倾斜,导致某个节点的 barrier 下发不及时导致的,通过 rebalance 之后才能够解决。

总的来说 Flink 功能还是很强的,它文档比较完善,网上资料非常丰富,社区也很活跃,一般遇到问题都能够比较快的找到解决方案。

四、实时数据查询系统

我们的实时查询系统,多维实时查询系统用的是 Clickhouse 来实现的,这块分为三个部分来介绍。第一是分布式高可用,第二是海量数据的写入,第三是高性能的查询。

Click house 有很多表引擎,表引擎决定了数据以什么方式存储,以什么方式加载,以及数据表拥有什么样的特性?目前 Clickhouse 拥有 merge tree、replaceingMerge Tree、AggregatingMergeTree、外存、内存、IO 等 20 多种表引擎,其中最体现 Clickhouse 性能特点的是 merge tree 及其家族表引擎,并且当前 Clickhouse 也只有 merge 及其家族表引擎支持了主键索引、数据分区、数据副本等优秀的特性。我们当前使用的也是 Clickhouse 的 merge tree 及其家族表引擎,接下来的介绍都是基于引擎展开的。

1. 分布式高可用

先看分布式高可用,不管单节点的性能多强,随着业务的增长,早晚都会有遇到瓶颈的一天,而且意外的宕机在计算机的运行中是无法避免的。Clickhouse 通过分片来水平扩展集群,将总的数据水平分成 m 分,然后每个分片中保存一份数据,避开了单节点的性能瓶颈,然后通过副本即每个分片拥有若干个数据一样的副本来保障集群的高可用。

再看看 Clickhouse 默认的高可用方案,数据写入是通过分布式表写入,然后分布式表会将数据同时写入到同一个分片的所有副本里面。这里会有一个问题,如果副本 0 写入成功,副本 1 写入失败,那么就会造成同一个分片的不同副本数据不一致的问题,所以默认的高可用方案是不能够用于生产环境的。

我们这里听取的是 Clickhouse 官方的建议,借助了 Zookeeper 实现高可用的方案,数据写入一个分片的时候,仅仅写入一个副本,然后再写 Zookeeper,通过 Zookeeper 告诉同一个分片的其他副本,再过来拉取数据,保证数据的一致性。

接下来看一下 Clickhouse 实现这种高可用方案的底层原理,这种高可用的方案需要通过 Clickhouse 的 replicated merge tree 表引擎来实现,其中在 replicated merge tree 表引擎的核心代码中,有大量跟 Zookeeper 进行交互的逻辑,从而实现了多个副本的协同,包括主副本的选举写入任务队列的变更和副本状态的变化等等。可以看到外部数据写入 Clickhouse 的一个分片,会先写入一个副本的内存中,在内存中按照指定的条件排好序,再写入磁盘的一个临时目录。最后将临时目录重命名为最终目录的名字,写完之后通过 Zookeeper 进行一系列的交互,实现数据的复制。

这里没有选用消息队列进行数据的同步,是因为 Zookeeper 更加轻量级,而且写的时候任意写一个副本,其他的副本都能够通过读 Zookeeper 获得一致性的数据,而且就算其他节点第一次来获取数据失败了,后面只要发现它跟 Zookeeper 上的数据记录不一致,就会再次尝试获取数据,保证数据的一致性。

2. 海量数据的写入

■ 2.1 Append + Merge


数据写入遇到的第一个问题是海量数据直接写 Clickhouse 是会失败的。Clickhouse 的 merge tree 家族表引擎的底层原理类似于 LSM tree,数据是通过 append 的方式写入,后续再启动 merge 线程,将小的数据文件进行合并。了解了 Clickhouse merge tree 家族表引擎的写入过程,我们就会发现以下两个问题。

  • 如果一次写入的数据太少,比如一条数据只写一次,就会产生大量的文件目录。当后台合并线程来不及合并的时候,文件目录的数量就会越来越多,这会导致 Clickhouse 抛出 too many parts 的异常,写入失败。

  • 另外,之前介绍的每一次写入除了数据本身,Clickhouse 还会需要跟 Zookeeper 进行 10 来次的数据交互,而我们知道 Zookeeper 本身是不能够承受很高的并发的,所以可以看到写入 Clickhouse QPS 过高,导致 zookeeper 的崩溃。

我们采用的解决方案是改用 batch 的方式写入,写入 zookeeper 一个 batch 的数据,产生一个数据目录,然后再与 Zookeeper 进行一次数据交互。那么 batch 设置多大?如果 batch 太小的话,就缓解不了 Zookeeper 的压力;但是 batch 也不能设置的太大,要不然上游的内存压力以及数据的延迟都会比较大。所以通过实验,最终我们选择了大小几十万的 batch,这样可以避免了 QPS 太高带来的问题。

其实当前的方案还是有优化空间的,比如说 Zookeeper 无法线性扩展,我有了解到业内有些团队就把 Mark 和 date part 相关的信息不写入 Zookeeper。这样能够减少 Zookeeper 的压力。不过这样涉及到了对源代码的修改,对于一般的业务团队来说,实现的成本就会比较高。

■ 2.2 分布式表写入

如果数据写入通过分布式表写入会遇到单点的磁盘问题,先介绍一下分布式表,分布式表实际上是一张逻辑表,它本身并不存储真实的数据,可以理解为一张代理表,比如用户查询分布式表,分布式表会将查询请求下发到每一个分片的本地表上进行查询,然后再收集每一个本地表的查询结果,汇总之后再返回给用户。那么用户写入分布式表的场景,是用户将一个大的 batch 的数据写入分布式表,然后分布式表示按照一定的规则,将大的 batch 的数据划分为若干个 mini batch 的数据,存储到不同的分片上。

这里有一个很容易误解的地方,我们最开始也是以为分布式表只是按照一定的规则做一个网络的转发,以为万兆网卡的带宽就足够,不会出现单点的性能瓶颈。但是实际上 Clickhouse 是这样做的,我们看一个例子,有三个分片 shard1,shard2 和 shard3,其中分布式表建立在 shard2 的节点上。

  • 第一步,我们给分布式表写入 300 条数据,分布式表会根据路由规则把数据进行分组,假设 shard1 分到 50 条,shard2 分到 150 条,shard3 分到 100 条。

  • 第二步,因为分布式表跟 shard2 是在同一台机器上,所以 shard2 的 150 条就直接写入磁盘了。然后 shard1 的 50 条和 shard3 的 100 条,并不是直接转发给他们的,而是也会在分布式表的机器上先写入磁盘的临时目录。

  • 第三步,分布式表节点 shard2 会向 shard1 节点和 shard3 节点分别发起远程连接的请求,将对应临时目录的数据发送给 shard1 和 shard3。

这里可以看到分布式表所在的节点 shard2 全量数据都会先落在磁盘上,我们知道磁盘的读写速度都是不够快的,很容易就会出现单点的磁盘性能瓶颈。比如单 QQ 看点的视频内容,每天可能写入百亿级的数据,如果写一张分布式表,很容易就会造成单台机器出现磁盘的瓶颈,尤其是 Clickhouse 的底层运用的是 merge tree,它在合并的过程中会存在写放大的问题,这样会加重磁盘的压力。

我们做的两个优化方案:

  • 第一个就是对磁盘做了 RAID 提升了磁盘的 IO;

  • 第二就是在写入之前,上游进行了数据的划分分表操作,直接分开写入到不同的分片上,磁盘的压力直接变为了原来的 n 分之一,这样就很好的避免了磁盘的单点的瓶颈。

■ 2.3 局部 Top 并非全局 Top

虽然我们的写入是按照分片进行了划分,但是这里引入了一个分布式系统常见的问题,就是局部的 Top 并非全局 Top。比如说同一个内容 x 的数据落在了不同的分片上,计算全局 Top100 点击内容的时候,之前说到分布式表会将查询请求下发到各个分片上,计算局部的 Top100 点击的内容,然后将结果进行汇总。

举个例子,内容 x 在分片一和分片二上不是 Top100,所以在汇总数据的时候就会丢失掉分片一和分片二上的内容 x 的点击数据。

第二是会造成数据错误,我们做的优化就是在写入之前加上了一层路由,我们将同一个内容 ID 的数据全部路由到了同一个分片上,解决了该问题。这里需要多说一下,现在最新版的 Clickhouse 都是不存在这样这个问题的,对于有 group by 和 limit 的 SQL 命令,只把 group by 语句下发到本地表进行执行,然后各个本地表执行完的全量结果都会传到分布式表,在分布式表再进行一次全局的 group by,最后再做 limit 的操作。

这样虽然能够保证全局 top N 的正确性,但代价就是牺牲了一部分的执行性能。如果想要恢复到更高的执行性能,我们可以通过 Clickhouse 提供的 distributed_group_by_no_merge 参数来选择执行的方式。然后再将同一个内容 ID 的记录全部路由到同一个分片上,这样在本地表也能够执行 limit 操作。

3. 高性能的存储和查询

Clickhouse 高性能查询的一个关键点,就是稀疏索引。稀疏索引这个设计很有讲究,设计的好可以加速查询,设计的不好反而会影响查询效率。因为我们的查询大部分都是时间和内容 ID 相关的,比如说某个内容过去 n 分钟在各个人群的表现如何,我按照日期分钟粒度时间和内容 ID 建立了稀疏索引,针对某个内容的查询,建立稀疏索引之后,可以减少 99% 的文件扫描。

Clickhouse 高性能查询的第二点,就是我们现在的数据量太大,维度太多,拿 QQ 看点的视频内容来说,一天入库的流水就有上百亿条,有些维度有几百个类别,如果一次性把所有的维度进行预聚合查询反而会变慢,并且索引会占用大量的存储空间。我们的优化就是针对不同的维度建立对应的预聚合和物化视图,用空间换时间,这样可以缩短查询的时间。

举个例子,通过 summary merge tree 建立一个内容 ID 粒度聚合的累积,累加 pv 的物化视图,这样相当于提前进行了 group by 的计算,等真正需要查询聚合结果的时候,就直接查询物化视图,数据都是已经聚合计算过的,且数据的扫描量只是原始流水的千分之一。

分布式表查询还会有一个问题,就是查询单个内容 ID 的时候,分布式表会将查询请求下发到所有的分片上,然后再返回给查询结果进行汇总。实际上因为做过路由,一个内容 ID 只存在于一个分片上,剩下的分片其实都是在空跑。针对这类的查询,我们的优化就是后台按照同样的规则先进行路由,然后再查询目标分片,这样减少了 n 分之 n -1 的负载,可以大量的缩短查询时间。而且由于我们提供的是 OLAP 的查询,数据满足最终的一致性即可。所以通过主从副本的读写分离,也可以进一步的提升性能。我们在后台还做了一个一分钟的数据缓存,这样针对相同条件的查询,后台就可以直接返回。

4. Clickhouse 扩容方案

我们调研了业内一些常见的方案:

  • 比如说 HBase 原始数据是存放在 HDFS 上的,扩容只是 region server 的扩容,并不涉及到原始数据的迁移。

  • 但是 Clickhouse 的每个分片数据都是在本地,更像是 RocksDB 的底层存储引擎,不能像 HBase 那样方便的扩容。

  • 然后是 Redis,Redis 是 Hash 槽这一种,类似于一致性 Hash 的方式,是比较经典的分布式缓存方案。

Redis slot 在 Hash 的过程中,虽然会存在短暂的 ASK 不可用,但是总体上来说迁移还是比较方便的。就从原来的 h0 迁移迁移到 h1,最后再删除 h0,但是 Clickhouse 大部分都是 OLAP 的批量查询,而且由于列式存储不支持删除的特性,一致性 hash 的方案也不是很合适。

我们目前的扩容方案就是从实时数仓另外消费一份数据写入新的 Clickhouse 集群,两个集群一起跑一段时间,因为实时数据我们现在就保存了三天,等三天之后,后台服务就直接访问新的 Clickhouse 集群。

五、实时系统应用成果总结

我们输出了腾讯看点的实时数据仓库,DWM 层和 DWS 层两个消息队列,上线了腾讯看点的实时数据分析系统,该系统能够亚秒级的响应多维条件查询请求。在未命中缓存的情况下:

  • 过去 30 分钟的内容查询,99% 的请求耗时在一秒内;

  • 过去 24 小时的内容查询 90% 的请求耗时在 5 秒内,99% 的请求耗时在 10 秒内。

以上是关于腾讯看点基于 Flink 构建万亿数据量下的实时数仓及实时查询系统的主要内容,如果未能解决你的问题,请参考以下文章

日均20万亿次计算量!腾讯基于Flink的实时流计算平台演进之路丨附PPT下载

基于Flink构建实时数据仓库

基于Flink构建实时数仓实践

基于Flink构建企业级实时数仓(附项目源码)

快手基于 Flink 构建实时数仓场景化实践

快手基于 Flink 构建实时数仓场景化实践