基于数据湖格式构建流式增量数仓—CDC
Posted 阿里云云栖号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于数据湖格式构建流式增量数仓—CDC相关的知识,希望对你有一定的参考价值。
摘要:本文整理自阿里云开源大数据平台技术专家毕岩(寻径)在 Apache Con ASIA 的分享。本篇内容主要分为四个部分:
1.湖格式& Hudi & CDC
2.湖格式设计实现 CDC 的思考
3.Hudi CDC 实现
4.湖格式 Streaming 的优化
2021年中 Databricks 发布了一篇基于 Delta Lake 实现 CDC 场景的介绍文档,2022年初我们在阿里云EMR 内部 Delta Lake 版本实现的 CDC 的能力,同期在 Apache Hudi 提案了 Hudi 基于 Spark 实现 CDC 的设计文档和实现代码。
结合这些经验,今天以 Apache Hudi 为主,分享一下数据湖格式上实现 CDC 的一些思考和注意点,以及一些流式 streaming 通用的优化点,和 Hudi CDC 的后续规划。
湖格式 & Hudi & CDC
该部分主要介绍下此次分享涉及到的一些概念,包括湖仓、数据湖格式、Apache Hudi,以及 Change Data Capture(CDC)的一些需要了解的东西。
湖仓 & 湖格式
相信大家对于数据仓库,数据湖,进而到两者结合的湖仓的演变有了一些了解,这里就不过多介绍了。湖仓(LakeHouse)有以下一些关键特性。
介绍几个关键特性:
- ACID 事务:同一张表经常会同时有多个工作流来读写,事务保证了我们能够读、写到正确的数据;
- Schema Enforcement 和数据管理:可以加上 Schema Evolution。Enforcement 在 Databicks 相关文章解释上等同于 Valication,在写入数据时,严格检测 schema 并要求和目标表定义的一致。Schema Evolution 允许修改表的字段信息(如增删字段,修改字段类型和描述等)。另外,湖仓还应提供健壮的湖表治理和审计的能力;
- 支持结构/非结构数据,支持多类API:湖仓架构能够支持半/非结构化数据(如JSON,图像,语音等)的存储,以及提供除了基本SQL之外丰富的API来处理数据,应用在如机器学习等场景;
- 批流一体:数据湖提出之初,很重要的就是替代 Lambda 架构,批流一体能够有效的简化流式和离线两条数据链路的开发和运维成本;
- 存算分离:成本是各个公司都需要关注的。如果存储和计算都能按需伸缩,会更便于精细化控制成本。
我们发现大部分的湖仓关键特性是需要由底层存储之上的数据组织方式,即数据湖格式来提供的,我认为这也是 DeltaLake、Apache Hudi,Apache Iceberg 近两年兴起的主要背景和原因吧。
Apache Hudi
Apache Hudi 是一个构建于自管理数据库层之上的,使用增量数据流来构建数据湖的一个功能丰富的平台。相对于其他湖格式,Hudi具备更细粒度的数据布局(FileGroup),支持多种索引提升 Upsert 性能,以及在开源版本上较为丰富的自动化湖表管理能力。
CDC
Change Data Capture:定义了一种场景,即识别并捕获数据库表中数据的变更,并交付给下游进一步处理。CDC是对针对行级数据记录的。其中数据的变更信息,即 CDC 的数据结构,包括变更是什么样的操作(有三类:insert,update,delete),变更发生的时间点,以及变更前后的数据。显然对于insert操作该记录的变更信息中是没有旧值的,对于 delete 操作该记录的变更信息中是没有新值(当前值)的。
CDC 典型方法
CDC 不是数据湖格式特有的概念和场景,它存在已久。并且在传统数据库有些一些典型的方法:
- 时间戳/版本号:是在表上添加一个类似于 created_time 和 last_modified_time 这样的字段,标识记录的创建时间和最新修改时间,查询时根据 modified_time 做过滤,得到变化的数据。这个方法有几个明显的缺点:
- 不能感知到delete的变化。
- 不能直接获取得到update的旧值,因此这类方案仅适用于没有delete操作,且不关注旧值的业务场景。
- 由于没有快照或者版本的概念,不能准确的捕捉每次变更。
- 表Diff:通过对比表的前后两个状态或者快照的数据来获取 CDC 数据。由于要做Diff,就会有快照间 Join 操作,该方法查询性能较差,且占用更多的计算资源。
- 数据库的触发器:有些数据库如 Oracel 等支持为 insert,update,delete 操作创建触发器。在执行 insert/update/delete 操作时,自动将旧值和新值,以及操作类型,和时间戳等信息写入另外一张影子表。由于直接将变更的数据写入到了另外一张表,可以直接使用 sql 查询得到,查询性能和语法支持都较好。但这个方法增加了写入时开销。
- 事务日志:目前数据库会保留了表的操作事务日志,用于备份和系统恢复。我们可以从这里面提取出 CDC 数据,像 mysql,RDS 等这类数据库都支持。该方法不会增加写入的开销,但获取 CDC 数据需要从事务日志解析出来,同时由于事务日志会定期归档,不会永久保存,需要及时读取出来,这也就是我们通常会对接 Kafka 来消费 binlog 的场景。
CDC 场景示例
上图定义了一个通用的数仓场景,user_city_tbl 和 user_name_tbl 维表做 Join 操作将打宽后的数据更新到 user_name_city_tbl中(类似于 ODS 到 DWD 的链路)。
user_name_city_tbl 按 city 聚合统计出城市的常驻人口后同步成 city_population_tbl(类似 DWD 到 DWS 的链路)。
在没有实际 CDC 功能的场景下,传统数仓的解决方案一般是基于 Kafka 里的流式数据生成 ODS、DWD、DWS 的各层数据,便于查询实时的数据,同时小时/天粒度做离线的全表/分区的ETL修复数据用于历史查询使用。如上例离线方式,user_city_tbl 每次更新后,需要全表和 user_name_tbl 做 Join,然后 overwrite 到 user_name_city_tbl,后续也需要全表按 city 聚合 overwrite 到完整的 city_population 表中。很难兼得增量数据处理、实时性、低管理运维成本这几个方面。
在具备 CDC 的场景下,只需要一个流式 workflow 即可。如 user_city 表变更后得到change data,仅将这部分数据和 user_name 表做 Join,就可以针对 user1 更新 city 信息,将新数据 user5 对应的记录通过 upsert 语法(Merge Into)同步至 user_name_city_tbl 中。同时得到 user_name_city_tbl 的变更数据(user1的 city 从 bj 变更为 hz,新增 user5 记录及对应的 name 和 city 的值),实现对 bj city 的人口数减一,对 hz 和 wh 两个 city 字段加一。仅使用一条流式 workflow,能够实时的将增量变更信息同步更新至下游。
举例补充另外一个场景:银行通过获取当天凌晨和前一天凌晨之间的账户金额变更来检测账户的健康程度。若有两个账户,A全天账户没任何操作,而账户B全天操作上百次,但最终金额也没有变化。如果单纯的比较前一天凌晨和当天的账户金额是否变化是没办法判断账户的健康指标的。对于类似场景,CDC 的解决方案中需要具备追溯每一次的变更的能力。
综合这些场景,梳理了 CDC 这个场景应该具备以下能力:
- 变更信息中应该包含所有的表字段,不能仅包括主键的;
- 对于 delete/update 操作,应该包括操作前的旧值;
- 能追溯每一次的变更。
CDC 输出格式
最后,我们来看下 CDC 的输出格式。
关于 CDC 的输出格式,我认为只要包含了全局的变更信息,包括操作类型,操作时间或对应版本,以及前后值就足够。这里列出了典型的 debezium 的格式,和上面场景示例中使用的格式,也是 deltalake 所采用的。debezium 是一个较为通用的格式,便于集成到已有系统中,而在部分场景 deltalake 自定义的格式对SQL查询更友好。
湖格式设计实现 CDC 的思考
CDC 设计
前面介绍了在数据库中典型的 CDC 方法,但数据湖格式较数据库、传统基于 Hive 的数仓还是有很大不同的。湖格式的特点:
- 支持多版本/多快照。基于这个特性,至少可以使用表 Diff 的方法来获取 CDC 数据。
- 每个版本准确的映射到一组有效数据文件,且单次操作会在文件元数据层面记录数据文件的变更,即每次操作(如insert,merge into等)会将新增数据文件标记为有效的,历史部分数据文件在当前版本被标记为无效。
- 没有常驻服务,操作依赖现有的计算引擎 Spark 或者 Flink。没有后台处理进程,那类似于数据库触发器的方法,是没办法由后台执行,如果需要只能在当前操作时间内完成,增加部分写开销。
在设计湖格式支持 CDC 需要考虑到:
- CDC 各场景能力的覆盖,包括上述提到的较为复杂的。至于提供的信息是否被需要由下游场景决定,但湖格式本身要能将需要的 CDC 相关信息都交付给下游。
- 读写性能。类似于触发器或者事务日志的方式,需要记录额外的信息,要考虑到对写入性能的影响;同样对于表 Diff 这样的方式,要考虑到查询读取性能。
- 约定输出格式。
传统数据库的 CDC 方法中,时间戳或者版本号的方法在场景支持上的缺陷;湖格式没有自己常驻服务,不存在事务日志,因此我们先来看下表 Diff 的实现方式。其原理就是基于湖格式本身的 time-travel 查询历史版本的能力,和后续的版本逐一做 Diff 得到 CDC 数据。优点是不会增加写入开销,缺点是查询性能差。如果我们想要进一步优化查询性能,可能的思路就是降低 Diff 或 Join 的粒度,从最差做全表的 Diff,到仅做当前 commit 涉及到的分区级 Diff,再进一步按桶等等。DeltaLake 可以通过当前 commit 内新增文件和被标记无效的文件之间来做。Hudi 本身具备分区级更小的 FileGroup 文件组的概念,也可以减少 Diff 的数据量。
再看一下 DeltaLake 在 CDC 场景的实现方案 CDF,Change Data Feed。其核心在于数据写入时直接持久化 CDC 数据。类似于数据库 trigger 的方式,直接保存 CDC 需要的所有信息,查询时直接加载这部分数据。优点是查询性能最好,缺点是增加了写开销。如果继续在这个方案下做一些优化:
- 根据不同的操作,避免每次commit都持久化。比如数据首次写入表,那么所有的数据本身一定都是insert操作类型的。这种就不需要再额外双写一份CDC数据。
- 如果表有主键,那可以仅持久化主键,然后将前后两个版本含主键的数据加载成两个map结构,分别以点查的方式获取旧值和当前值。若前值为空,该变更为insert操作,若新值为空,该操作为delete操作,否则即为update操作;这样减少持久化的数据量,也就减少了写时开销;当然也可以同时持久化记录的操作类型,来更准确的获取旧值和当前值。
我们结合以下两个场景来考虑这两个方案:
- 上游数据完成一次 commit,下游何时消费是不确定的,很可能一次性消费几个 commit 的 CDC,采用表 Diff 的方式,需要 Diff 每两个相邻版本,性能随 commit 的数量成倍下降;
- 实际的 streaming 场景,每 batch 更新全表数据量占比很小,我接触的场景占比小于0.0003。当然不同场景占比是不同的,但显然较低的占比,使得写入时增加的开销是可以接受的。
基于以上考虑,Databricks 和阿里云EMR 在 DeltaLake 的实现都采用了 CDF 的方案,同时阿里云EMR 团队贡献到 Apache Hudi 社区的也是基于此实现。
CDC 实现
确定了以 Change Data Feed 作为设计方案后,就需要考虑是具体实现上的注意事项了。
首先是写入。
- 针对不同湖格式各类写操作,明确涉及到的文件元数据变化。CDF 方案下可优化的第一点是根据不同的操作,判断是否需要持久化 CDC 数据。也就是一部分操作的变更信息直接读取 CDC 文件,而另一部分操作的表更信息就从普通的数据文件中提取转换。但是不同的湖格式对不同的操作有自己的定义,因此要具体湖格式具体分析。比如 DeltaLake 的 Insert Into/Insert Overwrite、Update,Delete,Merge 等操作就是我们正常认知的。而 hudi 由于引入了主键 recordKey 和比较建 preCombineField 的概念,即使是简单的 Insert Into 的 SQL 语法可能对应实际逻辑是 Update 操作。
- 针对 CDC 场景涉及到的写操作,明确需要拓展 CDF 能力的场景。湖格式内置很多湖表管理操作,如 DeltaLake 的 vacuum 和 Hudi 的 Clean 用于清理历史数据, DeltaLake 的 optimize 和 Hudi 的 clustering 用于合并小文件和做 zorder,Hudi 的 compaction 针对 mor 表合并增量数据文件等,这些操作都不会涉及到实际表的数据文件,仅仅是清理文件或者对数据重分布而已,是不需要关注和处理的,在查询时遇到这些操作,可直接忽略。而其他 DML 操作是直接修改表的数据,需要感知并处理的。这里举例说明下:
- 对于 Insert Into 操作,DeltaLake 不会读取任何其他已有文件,仅新增数据文件。因此 DeltaLake 实现 CDF 方案执行 Insert Into 不需额外再持久化,而仅需查询时加载到这批文件,将数据转换成约定的输出格式。但在 Hudi 内由于有数据 combine 的逻辑或者将数据写入现存的小文件的优化,会读取已有文件再重写,因此就需要持久化 CDC 数据。
- 对于 drop partition 操作,DeltaLake(社区版本不支持 drop partition,阿里云EMR 版本支持)和 Hudi 都直接将该分区的所有数据文件都标记为删除。这样也不需要持久化任何信息,查询时找到这些文件,加载并将每条记录标记为delete的操作类型,添加上其他如时间戳信息即可。
- 对于 update 操作,最底层的操作一定是加载满足 where 条件记录的数据文件,更新满足 where 条件的那部分数据,然后连同未修改的数据直接一起写入到新文件。这样的情况下,就需要拓展 CDF 能力。
最后在具体实现上,我们还要注意以下几点:
- 持久化的 cdc 数据需要保存到文件系统中,可能会改变原本的表的文件布局,这样的改变可能会对其他操作语义,如湖管理功能造成影响,必要时需要联动调整;
- 额外的 cdc 写入操作依然要保证 ACID 的语义。
基于 CDF 方案的实现了写操作后,查询变更数据时就会遇到这三类文件:
- 持久化的cdc数据文件。在正常情况下,CDC 数据文件记录了完全的 cdc 数据信息,包括变化数据的操作类型,旧值和新值,可以直接加载读取返回。
- 全为新数据的文件。如 Insert Into 操作引起的,查询时对每条来自这样文件的数据添加上值为insert的操作类型字段,和其他信息。
- 全为被删除的文件。如 drop partition 操作引起的,查询时对每条来自这样文件的数据添加上值为delete的操作类型字段,和其他信息。
- 上图右侧为抽象的一个数据结构 CDCFileSplit,主要的字段是 cdcFileType 和 filePath:
- filePath 为 CDC 查询时涉及到的数据文件,可能为 CDC 数据文件或者普通的数据文件。
- cdcFileType 标识 filePath 是哪类文件,也因此决定了如何从该文件中抽取解析 cdc 数据的具体逻辑。
- 在讨论 CDF 方案可优化点时提到,对于有主键概念的表可以仅持久化主键(和操作类型)来减少写时开销。那么就需要另外两个包含了该主键对应的旧值和当前值的数据文件 preImageFilePath 和 postImageFilePath,来平衡对写时开销较为在意的场景。
Hudi CDC 实现
结合 Apache Hudi 我们来看下具体实现。
Hudi CDC Write 实现
由于引入了主键和比较键的概念,Hudi 抽象了自己的写操作类型,如上图左侧所示。其中有普通 Insert/Upsert/Insert Overwrite 等常规写操作,也有 Cluster/Cmpact 等这样的湖管理操作。同上面提到写时优化的一些思考,我们仅需要关注标红的普通写操作,而可以直接忽略湖表管理操作。
同样由于有了自己的写操作语义,Hudi 抽象了两类写处理方式:其中HoodieWriteHandle(上图中有笔误)的子类 HoodieMergeHandler 是数据执行 upsert 的核心逻辑,将新数据和同一个主键的老数据(如果存在)合并,最终写入数据文件。而 HoodieWriteHandle 的其他子类处理的是非合并场景下的写入操作,如 bulk_insert 等。因此 HoodieMergeHandle 也就是我们之前提到的在必要的场景下要拓展 CDF 能力的地方。
DeltaLake 和 Hudi 的 CDC 查询流程基本一致,但由于数据布局的不同,在实现细节上也有不同。以 Hudi 为例来看一下湖格式上完整的 cdc 查询流程:
- 根据请求指定的 start,end 区间,获取关联到的 commit 信息,然后根据 commit 中的写操作过滤掉湖表管理这些不影响数据的 commit;
- 根据每个 commit 的写操作,或是读取 cdc 数据文件,或是加载当前版本的数据文件,或是加载前一个版本的数据文件,得到一个类似于前面提到的 CDCFileSplit 的对象列表(对应到 Hudi 代码中的 HoodieCDCFileSplit)。
- 按 CDCFileSplit 中的 cdcFileType 定义的加载策略,从文件中提取、解析成 cdc 数据,直接 union 返回。
Hudi CDC 使用示例
Hudi 端开启 CDC 的方式也是很简单的。建表 SQL 时或者 Spark DataFrame 写入时开启 hoodie.table.cdc.enabled 参数即可。这样数据写入时会自动持久化必要的 CDC 数据。查询时指定 cdc 的查询类型,及 start 和 end 的区间。
补充说明:
- 按当前实现查询时需要将 query.type 的配置调整为:
hoodie.datasource.query.type = incremental hoodie.datasource.query.incremental.format = cdc
- 同时支持 CDC 持久化选择持久化的类型,参数为:
hoodie.table.cdc.supplemental.logging.name
默认值为 data_before_after,会持久化所有 CDC 数据,查询时不再需要加载其他数据文件,利用提高 CDC 查询效率。其他可选值为 op_key_only 和 data_before,仅持久化操作类型和主键,或者额外多持久化旧值,这样写入时减少了一点 overhead,但查询时需要加载其他数据文件来获取确实的旧值当前值。
Hudi CDC 后续规划
目前 Hudi 已经完整支持了 Flink 和 Spark 两个引擎的 CDC 读写功能。
后续将会继续拓展 Spark SQL 语法,便于查询 CDC 数据;同时支持类似 DeltaLake 的扁平化的 CDC 输出格式,给 Hudi 用户另外一种选择集成到自己的数仓场景中。
目前 Hudi 已经完整支持了 Flink 和 Spark 两个引擎的 CDC 读写功能。
后续将会继续拓展 Spark SQL 语法,便于查询 CDC 数据;同时支持类似 DeltaLake 的扁平化的 CDC 输出格式,给 Hudi 用户另外一种选择集成到自己的数仓场景中。
湖格式Streaming 的优化
CDC 大多还是用于 streaming 场景,构建增量实时数仓,遇到的问题也是 streaming 通用的。这里列举两类
- Apache Hudi 或者阿里云EMR 版本的 DeltaLake 等湖格式都具备表的自管理能力,如定期清理历史数据文件的 vacuum(DeltaLake)和 clean(Hudi)操作,和合并小文件做 Zorder 优化的 optimize(DeltaLake)和 clustering(Hudi)操作。但这些如果放到 streaming 任务中某次 batch提交后操作,会占用当前 batch 的执行时间,影响写入性能,甚至有些场景直接导致任务失败,影响正常的写入流程。
- Streaming 任务实现复杂:基于 Spark Streaming 开发流式任务目前需要通过 dataframe 的 API 来实现,没有像离线场景的 SQL 语法那样简单。
针对这两类问题,介绍以下阿里云EMR 团队的解决方案。
应对第一类问题,阿里云EMR 在 Data Lake Formation 数据湖构建产品上支持了自动化湖表管理。
在实际生产使用中,我们可以在流式 workflow 中关闭 DeltaLake 或者 Hudi 的定期执行表管理的功能,推送 commit 信息到 DLF 服务端。DLF 会结合我们定义的策略,结合表的实时指标,来判断和采取相应的湖表管理操作。
不同于定期执行的无脑式执行,DLF 可以精细化的感知湖格式表的状态,比如实时分析历史过期数据的占比,根据对应策略中的阈值判断是否提交 clean 或 vacuum 任务来清理,再比如自动感知以时间分区的表的分区状态,自动对刚完成写入的分区执行小文件合并类的任务。另外,DLF 执行的湖管理任务是单独启动,不在当前的流式任务中,不影响正常的写入和性能。
应对第二类 Streaming 任务实现复杂问题,阿里云EMR 拓展了 Spark 的 SQL 语法,实现了 StreamingSQL。
如上述示例,左侧分别创建了 Hudi 目标表,和一张 Kafka 的源表。右侧是拓展的 StreamingSQL 语法,CREATE SCAN 创建一个关联到 Kafka 源表的流式视图;CREATE STREAM 语法创建流,消费 Kafka 数据并按用户给定的 SQL(示例中位 Merge Into 语法)完成写入操作。这样,我们可以极大的简化任务的开发和运维成本。
本文为阿里云原创内容,未经允许不得转载。
以上是关于基于数据湖格式构建流式增量数仓—CDC的主要内容,如果未能解决你的问题,请参考以下文章
智能湖仓架构实践:利用 Amazon Redshift 的流式摄取构建实时数仓