ClickHouse 在有赞的实践

Posted 过往记忆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ClickHouse 在有赞的实践相关的知识,希望对你有一定的参考价值。

分享嘉宾:陈琦(前) 有赞,编辑整理:刘鹏鹏 滴滴出行,出品平台:DataFunTalk

导读:有赞是什么?有赞是一家商家服务公司。致力于成为商家服务领域里最被信任的引领者;并持续做一个Enjoy的组织。主要的业务有:① SaaS服务:有赞微商城,有赞零售,有赞连锁,有赞美业,有赞小程序;② Paas云服务:面向第三方开发者的有赞云;③ 支付业务。 

本次分享的题目为ClickHouse在有赞的实践,主要介绍:

  • OLAP在有赞的发展

  • ClickHouse在有赞的平台化工具建设

  • ClickHouse在有赞的应用

  • 未来规划和一些探索

嘉宾GitHub网址:https://github.com/kaka11chen

01

OLAP在有赞的发展

1. 有赞OLAP发展

有赞在2018年引入了Presto以解决离线数据的交互式分析问题。Presto采用的是MPP架构,全内存 + pipeline的技术实现,比较适合交互式分析。我们主要将Presto应用在临时查询、BI报表、元数据的分析等场景。

在2019年上半年我们引入了Druid。在没有Druid之前,面对实时的场景我们只能通过Flink或者Storm去预计算好结果,将结果直接存储在 KV (如Redis、HBase)里面然后再进行查询,这种方式开发起来非常地繁琐,因此我们引入了Druid来完成数据的实时分析。Druid的原理是预计算最细粒度的聚合,比如说我有三个维度A、B、C,Druid会预计算A、B、C三个维度再加上TimeStamp的粒度,然后整个预计算这些结果,查询的时候再通过这份结果进行查询,这种方式可以减少维度爆炸的问题,相对来说是一种比较平衡的方式。应用场景主要包括实时监控、实时分析、实时数据产品等。

2019年的下半年我们也同时引入了Kylin,主要应用在对于一些精确度要求比较高,性能要求也比较高的离线数据分析场景比如商家后台的一些场景。Kylin的实现原理是完全预聚合和立方体,它会把各种组合都在HBase里面算出来。当然现在Kylin4的版本也慢慢走向了不是所有的聚合结果都帮你算出来,我们这边也在引入Kylin4,并且提交了一些patch。

2020年我们引入了ClickHouse,主要用来进行实时数据分析。ClickHouse的原理是明细动态聚合查询,因为以前Druid都是帮你预聚合好,我们没办法直接通过明细数据进行查询。此外ClickHouse还能通过物化视图做到类似Druid的预聚合的功能,虽然物化视图相对来说比较简单一点,这个后面也会讲一下。我们的应用场景包括SCRM、DMP、CDP、直播分析还有一些日志指标分析等。

2. 有赞OLAP发展和选型

这里主要对以上几款产品在技术、延时、SQL支持程度、生产数据成本、支持Join、去重方式等方面的表现进行了对比。

Presto:技术方面,Presto采用的MPP系统和SQL On Hadoop;Presto的延时是天/小时级别,虽然现在数据湖IceBerg、HuDi比较火,他们希望把它达到分钟级别,但是就目前来看还没到很成熟大规模使用的阶段;Presto的查询延迟一般,因为它是从明细层开始查询,没有任何预聚合;SQL支持程度还是比较完善的;因为没有预聚合,数据生产成本也比较低;Join也支持的比较好;去重的话也支持普通精确去重。

Druid:Druid采用了一些如位图索引、字符串编码、预聚合等的技术,刚才也讲过它只预聚合最细的维度组合,这样可以防止维度爆炸,但是会牺牲一点RT(响应时间),因此做了一个权衡;支持实时;查询延迟相对Presto会低很多;SQL支持的相对完善,但是没有Presto那么完善;Druid会做一部分的预聚合,自然需要一些成本;新版本开始慢慢准备支持Join了,但还不成熟,维度表的Lookup是一直支持的;去重方式采用的是HyperLogLog,快手我看到也有第三方的contributing去支持BitMap去重,但是这个我也没有深入调研过,这边就不多说了。

Kylin:Kylin采用的技术是完全预聚合的立方体,至少Kylin4之前的版本是这样的;它是把结果存到HBase,微批量的延迟;查询延迟也非常低,因为直接从HBase里面去做相应的结果查询,如果一些比较轻微的聚合可以通过HBase的Coprocessor去做一些比较轻微的一些聚合;SQL支持程度也比较完善,生产成本由于要做预聚合的立方体成本就比较高,生产成本一高的话灵活性就会变得很差,比如修改一些字段,重刷数据成本就会比较高;Join也是支持的;Kylin有一个比较好的一个事情是可以做到BitMap去重这个会是一个比较大的优势。

ClickHouse:ClickHouse采用的技术是明细动态聚合查询,当然也类似于Druid可以用一些物化视图,做一些预聚合的表,再通过预聚合的表进行查询;支持实时,明细查询延时还是比较低的,对比Presto、Impala包括对比Apache Doris,ClickHouse单表的查询性能确实很高;Join在一些情况下性能不佳,因为Join没有Exchange或者Shuffle Join,物化视图是预聚合的所以性能会更好一点;SQL支持只能说较完善,没有Presto和Kylin那么完善;生产数据如果是物化视图的话,会聚合一下,因此还是会有一些成本;Join刚才也讲了只能有限支持,比如说没有shuffle join,当数据量比较大的情况下,性能就不太佳甚至会查不出来,语法解析也不是很好,有时候比如说左边是个Table右边是个SubQuery这种,但是换一换可能性能就会更差;去重方式支持精确去重和HyperLogLog,但是没有支持BitMap去重。

3. ClickHouse特性

ClickHouse主要具有以下特性:

  • 灵活,支持明细数据SQL查询,并用物化视图加速。

  • 多核(垂直扩展),可以在一台机器上使用多线程去进行查询;分布式处理,它有不同的分片,这样的话可以进行水平扩展,MPP架构。

  • 支持实时批量数据摄入。

  • 列式存储、向量化引擎、代码编译生成。向量化引擎和代码编译生成基本是为了解决算子瓶颈,如果不通过这些技术的话一般是个火山模型,火山模型会有一些虚函数以及分支判断之间的一些开销。通过这两种方法,向量化可以去平摊开销,代码编译可以把它转成以数据为中心进而消除开销。但是这两种方法也不是万能的,比如说当Aggregation或者Join数据量比较大时候需要物化到内存,物化到内存的时候瓶颈也就产生了,因此也不会有非常大的性能争议。

  • 主键索引,ClickHouse会按照用户设置的主键进行排序,ClickHouse中MergeTree的文件就是按照这个逐渐进行排序的,Bloom Filter、minmax等做了二级索引。

当然,ClickHouse也不是万能的,它在以下方面并不擅长:

  • 没有高速,低延迟的更新和删除方法。不擅长单行数据,行级别数据的更新删除方法一般都是异步进行。

  • 稀疏索引使得点查性能不佳。点查没办法用ClickHouse,最好的是用KV类型的Redis或者HBASE。

  • 不支持事务。尽管事务现在对OLAP也会有一些用途,但是不是非常大的用途。

ClickHouse的应用场景:

  • 用户行为分析,精细化运营分析:日活、留存率分析、路径分析、有序漏斗转化率分析、Session分析等。

  • 实时日志分析,监控分析,实时数仓。

4. ClickHouse原理简介

很多人接触ClickHouse都会问的一个问题,ClickHouse为什么会快?

计算方面:ClickHouse采用的是自底而上的设计,极度关注性能,有各种优化。如Aggregator,一般Aggregator没有这么夸张的优化,更多的是对Int数据的Hash表的优化,其他类型可能就用一样的Hash表。ClickHouse对不同的数据类型有不同的HashMap。前面的一些情况也是它的原因,比如向量化引擎和代码编译生成,向量化确实能够提高,代码编译生成一般情况下大部分的数据也能提高,比如在表达式生成这种情况也会用代码编译生成。

存储方面:采用的是MergeTree,一般我们写ClickHouse都会按批次写,就是一个批次Insert过去,然后会形成part(partition)文件,假如只有一个分区,就形成一个part文件,part文件是按照主键进行排序的其内部有序,ClickHouse后天会默默地把这些文件进行合并,有点像LSM-Tree。主键包括以下几个部分,首先有一个primary.idx是它是主键索引,该索引是稀疏索引而非稠密索引,这样的好处是可以把稀疏索引放到内存中性能会更佳,而且OLAP不是OLTP,它更多的是聚合计算,所以瓶颈更多是在聚合计算的算子那里,但是比如很多小查询那就不一定了。然后主键索引会去找到它相应的.mkr文件,是跟主键索引是一一对应的。mkr文件记载主键索引的比如说行号。后面是数据文件bin,记载了这两个文件之间的offset。整体流程是先通过主键,然后找到MKR,然后再找到bin的offset。中间还有些压缩之类的东西,会复杂一点,这里就先简单讲一下。

02

ClickHouse在有赞的平台化工具建设

1. ClickHouse集群部署

我们这边大数据平台是一个中台部门,所以更希望是我们这边把整个大数据组件给包装起来统一管理,业务方只需要使用包装后的系统。

上层是应用场景,刚才有讲SCRM、DMP、CDP、企业微信助手。

应用场景这块,我们在最前端会有个Load Balance,目前我们用的是LVS,其实HTTP代理也行。但是因为我们的统一全权代理会有一些限制,因此我们现在选用的是LVS会比较方便一点。

第二层Proxy部分,目前我们用的是Apisix代理网关,该网关是基于nginx的,Nginx现在有一个基于OpenResty的Apisix的代理网关可以用来做一些网关或者边缘网关。我们用它来做一些熔断、限流、安全、日志这些功能。用起来也比较方便,DMP这边我们还实现了一个自定义插件,这个后续会讲。

下面一层是分布式表,分布式表之后就是每一个分片,每一个分片都有两台副本。比如说这边三个分片的话就是3x2共6台机器。Zookeeper的话我们是用SSD单独部署的,因为ClickHouse在副本复制的时候需要Zookeeper去协调。上图画的不是那么的准,在分布式表跟分片之间我们是部署在同一些机器上面的,这样比较省成本。

2. ClickHouse写入

ClickHouse写入部分,离线我们一般是通过Spark将Hive表导到ClickHouse里面,还有通过Flink将Kafka的数据进行导入。需要注意的是必须批量写入,原因刚才也讲过,因为ClickHouse每次写入会根据partition形成parts文件,如果一条数据写一个parts文件的话会合并不过来,因为后面合并的话就会看到合并比插入都慢的那种错误,官方推荐是用批量方式写。

后面就是写本地表,读分布式表。ClickHouse也可以写分布式表,但是对Zookeeper的性能压力会比较大,整个性能也会比较差。我们有两种写入方式,一种是Random一种是Hash,Hash是为了后面DMP系统特殊设计的,这个后面会讲。我们Spark用的WaterDrop,是比较出名的一个Spark的工具,我们进行了自己修改一些代码定制。第一步是通过cluster name获得分片的信息,我们是直接通过apisix代理获得,apisix会写一个插件获得相应挂在后端的某一个ClickHouse Cluster分片的路径,获得之后我们就用Hash或者随机的方式插入数据,Flink也是类似的。

上图展示的是我们离线导入和实时导入部分的。

  • 离线导入:我们的数据平台里面有一类任务需要离线导入,用户可以直接选一张Hive表或者ClickHouse的目标表去导入。

  • 分片策略:如果是Hash的话会需要采用分片策略。

  • 实时导入:我们写了Flink的ClickHouse Connector,基于JDBC根据刚才那些要素进行修改。并且它不仅SQL可以用这个Connector,SDK也能用这个Connector。

3. ClickHouse离线读写分离

我们做好这套系统后发现有一个痛点就是业务场景很多都写入量巨大,写多读少,离线会导入很多Hive表。很多公司其实做一些产品出来,查询QPS都没有很高,因为产品刚刚上线需要寻找业务爆发点,没有办法一下子读很多,但是写入量会很大。这种情况的话,我们针对写入量只能扩分片来提高写入吞吐,其实就是一种读写不分离的方法,所以资源会比较浪费。

去年的话我看了那个QQ音乐的,他们也有这个思路去做ClickHouse读写分离,我们这边也就是同样去做了一下。一般来说可行方案有两种,一种就是用k8s ch临时集群来写入。可以用一个k8s集群,每次插入的时候用一个临时的k8s集群构建起来,然后数据直接写到临时集群,等插完之后再把相应的数据文件直接弄到线上集群。第二种方法是使用Spark构建数据文件。这种方法是可行的,但是相对来说复杂度会高一点。我们采用的是k8s方案,因为比较容易实现。

ClickHouse的基本工作流程如上图所示:

我们先讲一下架构,我这边会设计一个Master节点和一个StandBy Master节点去管理这些组件,包括高可用、扩缩容这些。DataImportJob启动起来之后通过WaterDrop直接导入到k8s临时写入集群,再导入某张表某个分区的时候会去启动一个临时的k8s集群。集群里面我们用的是官方的ch-operator,它除了本身的ClickHouse server服务,我们还自己写了一个叫CHSegmentPusher,字面上理解就是当数据导入临时集群之后,要把数据push到中间存储(HDFS)里面去。相应的ClickHouse集群这边我们也有相应的puller部署在正式集群的每个节点上,puller去从HDFS上取这个数据。

右侧是Apache Helix,apache Helix Framework是基于Zookeeper去帮你管理集群心跳、存活、高可用、扩缩容等。包括这个分布式任务队列,也是用zk实现的。其实这个架构有点像Druid那个segment加载,有了解Druid的小伙伴就比较清楚。

还有一些细节,比如说Helix里面我们自己去实现一个balance来保证相同分片的replicas形成一个partition。因为一定要这么做。

03

ClickHouse在有赞的应用案例

1. DMP人群画像系统

第一个是DMP人群画像系统,概括来说就是一个正交导入的方式。我们DMP人群画像一般业务都要去进行人群圈选,通过一些tag去圈选出一些人群来进行营销活动或者是预估人群的数目。如果把这些明细数据直接放到ClickHouse里面的话性能就会比较差,所以业界通常的做法是把它转成位图,这部分查询的性能就会相对高一点。我们的标签用户位图表基本上就是这样设计的,首先是tag_id、tag_value、uid_bitmap、uid_bitmap_offset,比如说我性别是男,就有一个标签uid_bitmap。至于为什么会有offset,是因为ClickHouse的bitmap只支持32位,如果ID大于32位怎么办呢?我们采用offerset让不同的bitmap来解决这样的问题。在插入数据的时候,无论是实时还是批量数据我们都会通过UserID进行哈希,这样的话某一个UID会在某一个分片上固定下来不会到别的分片,在性能方面这是一种最高效的做法。如果不采用这种方式,我们也试过用分布式表,结果可能三台机器比单台还慢,这肯定是接受不了的,因为没有办法横向扩展或者纵向扩展来提高性能,因此我们采用的正交哈希写入。一旦正交了之后,只要在自己的机器上面去处理本地的SQL,处理完之后直接将结果进行合并就行,甚至不用merge。比如说是人群的ID,那每个符合tag的人群的ID直接返回这个分片,然后另外一个也返回,然后他们把人群id叠加就行了,人数只要求个和就行了不用merge。这件事情我们不想让用户自己去客户端去实现,所以我们有apisix写了一个Customer Plugins,Customer Plugins去做这个事情,当你发了一个查询请求过来之后,会通过你的查询请求去不同的分片上面去并发进行查询,然后结果按照刚才说的进行合并,让用户感到透明。但这个只针对静态标签,或者是可枚举的一些静态标签。用户还有一些实时标签是不可枚举的,这部分现在是用的用户行为表,业务方直接用用户行为表明细数据进行查询,查询完之后的结果转换之后再跟刚才的静态预计算位图进行位图方面的计算最后得到结果。这个确实是有隐患的,因为没有预计算如果量很大的情况下可能会比较慢。

2. SCRM商家会员管理系统

SCRM系统是一个比较灵活的商家分析系统,它维度可变,状态回溯;支持动态圈选;支持跨店、跨天去重。目前Druid做不了这个事情,所以说我们用的ClickHouse。

3. 天网日志监控TopN系统

我们公司有一个日志分析软件叫天网,全公司的应用层日志全部都存在这个系统里面,还有一些监控的TopN,错误日志TopN、商家限流TopN、服务吞吐量TopN、特性/功能TopN。一开始这个技术是用Druid实现的,我们最近把它切换成了Flink + ClickHouse,并且使用物化视图进行预聚合,整体效果还是不错的。数据量也是比较大的,每秒100w行数据的Kafka写入QPS。写入QPS还是比较高的,换成ClickHouse这套方案可以比Druid节省60%的机器而且性能也大大提升。这个系统还有个特点就是查询QPS没有那么高,因为毕竟内部使用,不是ToC的。

04

未来规划和一些探索

最后我讲一下ClickHouse在有赞的未来规划和一些探索:

1. ClickHouse在有赞的未来规划

  • ClickHouse容器化部署:容器化部署更高拥有更好的弹性伸缩能力,也能和其他的服务进行混合部署来节省成本。我们觉得容器化部署是可以做的,虽然我们现在容器化部署的读写分离已经接入了这块的支持,但是我们觉得如果把线上集群也进行k8s化会带来更大的收益。

  • ClickHouse更多的推广,更多业务的接入:因为ClickHouse在2020年刚流行起来,好多以前的系统还是通过Druid、Kylin等OLAP组件去满足,所以未来可以去做更多的业务接入。

  • ClickHouse更好的平台化以及故障防范:平台化方面我们做还不是非常完善,特别是业务方易用性、多租户隔离、限流、熔断、监控报警,业务治理等方面需要更多的投入。

  • 一些业务以前是Druid单链路,我们准备把它改成Druid + ClickHouse的双链路或者将Druid的业务迁移到ClickHouse。

2. ClickHouse当前痛点

我们在使用ClickHouse的过程中发现了ClickHouse还没有到完美的程度。

  • ClickHouse不太像传统意义上的分布式数据库,整体来说比较“手动档”,很多地方都需要用户自己去设计一个流程去完善,包括写入和物化视图。

  • 没有自动Rebalance的能力,导致扩容缩容运维特别复杂。这个痛点还蛮大的,会增加运维的工作量。

  • Join不是采用Shuffle/Exchange Join,数据量大的时候性能差。并且ClickHouse的Join语法也不是支持的非常好。

  • 行级别的Update/Delete性能差,官方也不推荐。这个其实从业务层面来说,他们真的很需要这个,但是很多OLAP就是不支持,因为支持这个功能非常影响性能。业界比较多的是采用impala + kudu方案。

据我了解很多大厂都针对这些痛点进行了尝试改进。比如说存算分离,Exchange Join实现等很多大厂都在做。

那么Apache Doris呢?

现在Doris很火很多大厂已经开始尝试使用了,相比ClickHouse,Doris更像是一个分布式数据库,也解决了部分痛点,比如自动平衡、支持Shuffle Join等。而且是国人做的也是蛮骄傲的事情。但是就目前为止单表的性能包括成熟度、稳定性还不如ClickHouse需要继续发展。

还有一些是没有开源的如Hologres、TIFlash等,这些可以尝试以下,但是未知程度会比较高,稳定性成熟度这些也不知道,也不知道未来是否开源。

3. CliCkHouse + Doris POC实现

于是我们产生了一个想法:如果我们能够把两者结合,我们利用高性能的ClickHouse算子实现替换基于Impala的Apache Doris,在未来甚至能打造出更好的分布式OLAP数据库。

当然现在只是POC实现阶段,POC实现主要是验证可行性的,因此实现起来以快为主,很多地方可能只是临时Mock之类的。

验证主要是分成以下几个步骤:

① 尝试将Doris be的代码一起编译到ClickHouse中运行起来。(DONE)

② 尝试将解析相关fragments,Doris比较像分布式数据库,所以前端会发一个分布式执行计划,然后再到后端去解析,以前的话是基于impala这套机制再加上自己改动去做这件事,现在把它映射成ClickHouse的几个简单算子(Storage扫描、聚合、表达式),目前能跑一个简单的单机SQL。(DONE)

③ 尝试将Doris OLAP的存储实现成ClickHouse的Storage(StorageDorisOLAP),并且能跑一个简单的单机SQL,如果能用Doris的相关存储,也能使用其自动平衡能力。因为ClickHouse存储一般是Storage xxx,比如说Storage MergeTree。我们实现了StorageDorisOLAP,最终是能够用到Doris的存储,Doris的存储跟ClickHouse的存储有点像,但Doris有一个最大的好处是可以自动进行平衡,并且最小单位是Tablit比物理的partition更小,在这方面会更有优势。(DONE)

④ 尝试实现Doris exchange node的算子,能跑通一个简单的分布式SQL。(DONE)

总体来说,我觉得应该是可行的,但是可能比较复杂,如果顺利的话,后期我们会投入更多精力在这上面。

下面是一些运行时的截图。

4. 大数据数据库未来的趋势

  • 云原生:可任意扩缩容,存储计算分离,多租户,Serverless,按需付费(Snowflake、BigQuery、SaaS/Daas)。其实从国外Snowflake和BigQueue这两款软件基本上就概括了这些,特别是Snowflake。

  • 多模数据库:融合OLAP、OLTP能力,一站式解决(TiDB、TiFlash、Hologres)。我们面对很多用户需求的比如说日志分析的时候,他们又要查明细又要去查一些指标分析,我们现在没有办法用一个数据库,只能ClickHouse加上ES或者加上TiDB、mysql这种去处理。

  • 利用新硬件:现在也有一些GPU、FPGA等新硬件能够加速整个数据库的发展。

05

问答环节

Q:一级索引和二级索引之间有没有什么关系?

A:这个应该没有什么关系的,一级索引就是它的主键,主键的索引就叫一级索引,主键索引就是你拿这个东西在MergeTree里进行排序的。二级索引的话这些Bloom Filter、minmax那些,会比如说你跳过整个块,比如说minmax不符合范围。

Q:Order by是不是完全可以取代主键?

A:Order by它是如果你不配主键它就是主键,但是好像也可以去配主键,但一般没人搞这么复杂。

Q:ClickHouse读写分离下面的队列是做什么的?

A:因为这里工作流程列的比较详细,我就没有更细致的讲,其实比如说Spark把数据写入到k8s临时集群之后,那总要有个方法去通知我们这边的CHSegementPusher,然后CHSegementPusher接到了这个任务通知才会去把数据发到HDFS上面某一个目录。同样puller也是这样的,左边pusher全部发完之后puller也要去同样的我们主的DataImportJob也会去Distributed Task Queue发任务,让相关的puller去从HDFS上面取这个数据。它就是用来push、puller这些通信的,但是push、puller这些东西其实是通过Apache Helix去管理的,Apache Helix是一套基于Zookeeper的一些集群管理的一套系统,可以把这些东西定义成results,比如说master、puller、push,不同角色我可以定义成results。

Q:数据去重是怎么处理的?

A:数据去重是指哪方面需要去重,因为如果数据导入是没有办法做到Exactly-Once。从本质来讲,虽然ClickHouse现在有个block防止比如说网络抖动的时候,相同的block它可以不再导入,这种可以防止一下去重,但是理论上如果不是这种情况,它没有办法做到Exactly-Once。如果真的要去重,还有一种方法就是用ReplacingMergeTree可以异步地进行去重,当然也可以用一些方法比如每隔多少时间去做一下optimize强制聚合的这么一个事情,但是这个不能太频繁,因为这个操作会非常重。

今天的分享就到这里,谢谢大家。


在文末分享、点赞、在看,给个3连击呗~


分享嘉宾:

以上是关于ClickHouse 在有赞的实践的主要内容,如果未能解决你的问题,请参考以下文章

Flink 在有赞的实践和应用

Flink 在有赞的实践和应用

Druid在有赞的实践

Apache Flink 在有赞的实践和应用

实时数仓在有赞的实践

实时数仓在有赞的实践