Flink + Hudi 在 Linkflow 构建实时数据湖的生产实践
Posted Flink 中文社区
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink + Hudi 在 Linkflow 构建实时数据湖的生产实践相关的知识,希望对你有一定的参考价值。
-
背景
-
CDC 和数据湖
-
技术挑战 -
效果 -
未来计划 -
总结
一、背景
Linkflow 作为客户数据平台(CDP),为企业提供从客户数据采集、分析到执行的运营闭环。每天都会通过一方数据采集端点(SDK)和三方数据源,如微信,微博等,收集大量的数据。这些数据都会经过清洗,计算,整合后写入存储。使用者可以通过灵活的报表或标签对持久化的数据进行分析和计算,结果又会作为 MA (Marketing Automation) 系统的数据源,从而实现对特定人群的精准营销。
-
数据碎片化,由于 MySQL 大表 online DDL 风险较大,随着业务复杂度的提升,往往需要增加新的子表来扩展业务属性,也就是说一个完整的用户数据会散落在多张表中,这对查询十分不友好。 -
多维度查询无法实现,由于关系型数据库的优势不是多维度查询,并且给所有字段都加索引也并不现实,所以需要一款可支持 OLAP 查询引擎的数据组件来支撑多维分析的业务场景。并且考虑到未来可分别独立扩展的可能,我们也优先考虑计算和存储分离的架构。
二、CDC 和数据湖
-
Hudi 提供了一个在 HDFS 中 upsert 的解决方案,即类似关系型数据库的使用体验,对于可更新数据非常友好,并且也符合 MySQL binlog 的语义。 -
增量查询,可以很方便的获取最近 30 分钟,或者 1 天内发生变化的数据,这对于一些可叠加的离线计算任务非常友好,不再需要针对全量数据进行计算,只需要针对变化数据进行计算,大大节省了机器资源和时间。 -
可以实时同步元数据到 Hive,为“入湖即可查”创造了条件。 -
对 COW 和 MOR 两种不同使用场景分别进行了优化。 -
Hudi 社区开放且迭代速度快,在其孵化阶段就被 AWS EMR 集成,然后被阿里云 DLA 数据湖分析[2]、阿里云 EMR[3]以及腾讯云 EMR[4]集成,前景不错,同时 ApacheHudi 国内技术交流群讨论非常热烈,国内基于 Hudi 构建数据湖的公司越来越多。
三、技术挑战
3.1 CDC 运行模式定制
■ 全量模式
An optional, comma-separated list of regular expressions that match names of schemas specified in
table.include.list
for which you want to take the snapshot.
/**
*Perform a snapshot andthen stop before attempting to read the binlog.
*/
INITIAL_ONLY("initial_only",true);
// MySqlConnectorTask.java
if(taskContext.isInitialSnapshotOnly()){
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
chainedReaderBuilder.addReader(newBlockingReader("blocker",
"Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
chainedReaderBuilder
.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
}
即在 initial_only 的模式下 Debezuim 会使用 BlockingReader 替代 BinlogReader 将线程阻塞,不再进行增量消费。
■ 增量模式
DebeziumOffset specificOffset =newDebeziumOffset();
Map<String,Object> sourceOffset =newHashMap<>();
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos);
specificOffset.setSourceOffset(sourceOffset);
3.2 部分更新(Patch Update)
{
"id": 123,
"ts": 1435290195610,
"data": {
"age": 25
}
}
Simply overwrites storage with latest delta record
3.3 一批次内相同 rowkey 数据的归并
3.4 Schema evolution
What's Hudi's schema evolution story
Hudi uses Avro as the internal canonical representation for records, primarily due to its nice schema compatibility & evolution[9] properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in DeltaStreamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date.
-
Backwards compatible: 向后兼容,用新的 schema 可以读取旧数据,如果字段没值,就用 default 值,这也是 Hudi 提供的兼容方式。 -
Forwards compatible: 向前兼容,用旧 schema 可以读取新数据,Avro 将忽略新加的字段,如果要向前兼容,删掉的字段必须要有默认值。 -
Full compatible: 支持向前兼容,向后兼容,如果要全兼容,那么就需要只添加有默认值的字段,并且只移除有默认值的字段。 -
No Compatibility Checking:这种情况一般来说就是需要强制改变某个字段的类型,此时就需要做全量的数据迁移,不推荐。
3.5 同时查询和写入导致异常
Error checking path :hdfs://hudipath/.hoodie_partition_metadata, under folder: hdfs://hudipath/event/202102; nested exception is java.sql.SQLException: Query failed (#20210309_031334_04606_fipir)
基于上述信息,怀疑是在查询的同时,元数据信息被修改而导致的问题。在求助社区后,我们将 HoodieROTablePathFilter 中的 hoodiePathCache 改为线程安全的 ConcurrentHashMap, 重新打包得到 hudi-hadoop-mr.jar 和 hudi-common.jar ,替换到 presto/plugin/hive-hadoop2 的目录下,重启 Presto。后续没有发现NPE的情况。
四、效果
-
支持可变数据。 -
支持 schema evolution。 -
计算存储分离,支持多种查询引擎。 -
支持增量视图和时间旅行。
-
实时数据写入过程简化,之前的更新操作实现繁琐,现在开发过程中基本不用关心是新增还是更新操作,大大降低了开发人员的心智负担。 -
实时数据入湖到可查询的时间缩短,虽然我们的采用的是 COW 的表模式,但实际测试发现入湖到可查询的时效性并不低,基本都在分钟级。 -
离线处理性能提升,基于 Hudi 的增量视图特性,每天的离线任务可以很容易的获取过去24h变化的数据,处理的数据量级变小,进而带来更短的处理时间。
五、未来计划
5.1 Flink 集成
5.2 并发写
-
垂直分表,即将两部分文件分开,CDC 数据通过 Spark Streaming 写入,离线计算结果写入另一个文件,避免并发写。 -
模拟成 CDC 消息回写 Kafka,为了查询性能不能分表的情况下,离线计算结果会模拟成 CDC 消息写入 Kafka,再通过 Spark Streaming 写入 Hudi。但缺点也是很明显的,就是离线任务的结果反映到最终存储的时延较长。
5.3 性能优化
■ 索引
How does the Hudi indexing work & what are its benefits?
The indexing component is a key part of the Hudi writing and it maps a given recordKey to a fileGroup inside Hudi consistently. This enables faster identification of the file groups that are affected/dirtied by a given write operation. Hudi supports a few options for indexing as below
• HoodieBloomIndex (default) : Uses a bloom filter and ranges information placed in the footer of parquet/base files (and soon log files as well) • HoodieGlobalBloomIndex
: The default indexing only enforces uniqueness of a key inside a single partition i.e the user is expected to know the partition under which a given record key is stored. This helps the indexing scale very well for even very large datasets[11] . However, in some cases, it might be necessary instead to do the de-duping/enforce uniqueness across all partitions and the global bloom index does exactly that. If this is used, incoming records are compared to files across the entire dataset and ensure a recordKey is only present in one partition.• HBaseIndex
: Apache HBase is a key value store, typically found in close proximity to HDFS. You can also store the index inside HBase, which could be handy if you are already operating HBase.
You can implement your own index if you'd like, by subclassing the HoodieIndex
class and configuring the index class name in configs.
■ 更新
-
参数调整,要是否有办法平衡文件的数量和大小。 -
尝试部分业务表使用 MOR 模式,MOR 在更新时会先将数据写入日志文件,之后再合并到 Parquet,理论上可以降低覆写 Parquet 文件的频率。 -
讨论业务上的 trade-off 来换取更好的写入速度。
六、总结
引用链接
[2] 阿里云DLA数据湖分析: https://help.aliyun.com/document_detail/173653.html?spm=a2c4g.11186623.6.576.1562672dKa8RYR
[3] 阿里云EMR: https://help.aliyun.com/document_detail/193310.html
[4] 腾讯云EMR: https://cloud.tencent.com/document/product/589/42955
[5] [HUDI-1255] Add new Payload(OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage: https://github.com/apache/hudi/pull/2056
[6] [HUDI-1160] Support update partial fields for CoW table: https://github.com/apache/hudi/pull/2666
[7] [HUDI-1381] Schedule compaction based on time elapsed: https://github.com/apache/hudi/pull/2260
[8] wiki: https://cwiki.apache.org/confluence/display/HUDI/FAQ
[9] schema compatibility & evolution: https://docs.confluent.io/current/schema-registry/avro.html
[10] RFC - 24: Hoodie Flink Writer Proposal: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
[11] very large datasets: https://eng.uber.com/uber-big-data-platform/
[12] ding.wang@linkflowtech.com: mailto:ding.wang@linkflowtech.com
▼ 关注「Flink 中文社区」,获取更多技术干货 ▼
戳我,查看更多技术干货!
以上是关于Flink + Hudi 在 Linkflow 构建实时数据湖的生产实践的主要内容,如果未能解决你的问题,请参考以下文章
2天,撸完Flink+Hudi+Iceberg数据湖落地系统,爽!