Apache Hudi - 初步了解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Hudi - 初步了解相关的知识,希望对你有一定的参考价值。

参考技术A

URL:
https://zhuanlan.zhihu.com/p/149706105?utm_source=com.ucmobile

自己关于 Apache Hudi 的一些简单的了解和想法。

Hudi 是 Uber 主导开发的开源数据湖框架。所以大部分的出发点都来源于 Uber 自身场景,比如司机数据和乘客数据通过订单 Id 来做 Join 等。在 Hudi 过去的使用场景里,和大部分公司的架构类似,采用批式和流式共存的 Lambda 架构,我们先从 延迟,数据完整度还有成本 三个方面来对比一下批式和流式计算模型的区别。

批式模型就是使用 MapReduce、Hive、Spark 等典型的批计算引擎,以小时任务或者天任务的形式来做数据计算。

流式模型,典型的就是使用 Flink 来进行实时的数据计算。

针对批式和流式的优缺点,Uber 提出了增量模型,相对批式来讲,更加实时,相对流式而言,更加经济。

增量模型,简单来讲,是以 mini batch 的形式来跑准实时任务。Hudi 在增量模型中支持了两个最重要的特性,

在增量模型中,Hudi 提供了两种 Table,分别为 Copy-On-Write 和 Merge-On-Read 两种。

对于 Copy-On-Write Table,用户的 update 会重写数据所在的文件,所以是一个写放大很高,但是读放大为 0,适合写少读多的场景。对于这种 Table,提供了两种查询:

具体的流程见下图 gif:

[图片上传失败...(image-defbd0-1649294241693)]

对于 Merge-On-Read Table,整体的结构有点像 LSM-Tree,用户的写入先写入到 delta data 中,这部分数据使用行存,这部分 delta data 可以手动 merge 到存量文件中,整理为 parquet 的列存结构。对于这类 Tabel,提供了三种查询:

具体的流程见下图 gif:

[图片上传失败...(image-1b5dc0-1649294241692)]

关于上述的内容,Hudi 自身提供了一个比较便捷的 Docker Demo ,让用户可以很快地上手。

谈到数据湖框架,大家都会说出现在比较流行的三个开源软件,分别为 Delta Lake、Apache Hudi 和 Apache Iceberg。虽然经常把他们拿来一起比较,但是实际上每个框架的背景都是不一样的。

比如 Iceberg 的初衷是解决 Netflix 内部文件格式混乱的问题,Hive Table 中即可能是 csv,也可能是 parquet 文件格式,用户在做一些 metadata 的修改时,需要清楚的知道自己所操作 Table 的很多属性,针对这个痛点,Iceberg 提出了 everything can be a table 的概念,期望用 Iceberg Table 来统一所有的 Table。

而 Hudi 提出的则是批流两种计算模型的折中方案,Delta 我了解的不算太多,但是总体跟 Hudi 比较类似。目前 Apache Iceberg 也在积极地做 Row-Level Update,也就是类似 Hudi 的 upsert 功能。

虽然出发点不同, 但是三种框架无一例外都是指向了 Hive 这个统治数仓数十年,但是数十年来变化并不大的框架,随着数十年来 Hadoop 生态的发展,Hadoop 生态支持的数据量、数据类型都有一个很大的提升,以 Hive 做数仓必然是比较简单,但是 Hive 本身对 Table 中的内容掌控度是比较小的。 以仓储为例,Hive 相当于只是提供了一个仓库,但是没有利用仓库中的内容去做一些优化,大家只是把东西放到仓库里,但是仓库的东西一多,大家找东西就会比较乱,而新兴的数据湖框架,既提供了一个仓库的功能,同时还给仓库配上了标签信息、监控工具、智能运输等功能,即使仓库装的很满,用户也可以轻松根据标签定位到具体的货架。

Apache Hudi 0.12.0版本发布

Presto-Hudi 连接器

从 PrestoDB 0.275 版本开始,用户现在可以利用原生 Hudi 连接器来查询 Hudi 表。它与 Hive 连接器中的 Hudi 支持相当。要了解有关连接器使用的更多信息,请查看 prestodb 文档

存档点以外的存档

Hudi 支持保存点和恢复功能,这对备份和灾难恢复场景很有用。更多信息查看这里。在 0.12.0 之前,给定表的归档在第一次保存点提交之后就无法再次提交,但是社区已经要求放宽这个限制,以便可以在活动时间线中保留一些粗粒度的提交并执行时间点查询。因此在 0.12.0 中用户现在可以通过启用 hoodie.archive.beyond.savepoint写入配置,让存档在保存点提交之后继续进行,这为 Hudi 用户开启了新的机遇。例如通过每天为较旧的提交添加一个保存点(假设 > 30 天),可以将提交保留多年。并使用as.of.instant和任何较旧的保存点提交查询 hudi 表。这样 Hudi 不需要在活动时间线中为较旧的提交保留每个提交。

注意:如果启用此功能,则无法支持还原。 此限制将在未来的版本中放宽,可以在 HUDI-4500 中跟踪此功能的开发。

基于文件系统的锁

对于使用乐观并发控制的多个写入器,Hudi 已经支持基于Zookeeper、Hive Metastore 或 Amazon DynamoDB。 在0.12.0版本中,新添加基于文件系统的锁。 不像需要其他锁提供者中的外部系统,此实现基于原子获取/释放锁底层文件系统的创建/删除操作。 要使用此锁,用户需要设置以下配置

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider

Deltastreamer 终止策略

用户现在可以使用 deltastreamer 连续模式配置写入后终止策略。例如如果连续 5 次没有来自源的新数据,用户可以配置优雅关闭。这是终止策略的接口。

/**
 * Post write termination strategy for deltastreamer in continuous mode.
 */
public interface PostWriteTerminationStrategy 

  /**
   * Returns whether deltastreamer needs to be shutdown.
   * @param scheduledCompactionInstantAndWriteStatuses optional pair of scheduled compaction instant and write statuses.
   * @return true if deltastreamer has to be shutdown. false otherwise.
   */
  boolean shouldShutdown(Option<Pair<Option<String>, JavaRDD<WriteStatus>>> scheduledCompactionInstantAndWriteStatuses);

这可能有助于引导新表,与其做一个批量加载或bulk_insert,利用大型集群写入大量数据,不如在所有数据都被引导后,在连续模式下启动deltastreamer并添加一个关闭策略来终止。 这样每个批次可以更小,并且可能不需要大型集群来引导数据,Hudi内置一个开箱即用的具体实现,NoNewDataTerminationStrategy。 用户可以随意实施他们认为合适的策略。

Spark 3.3 支持

0.12.0添加了 Spark 3.3 支持,使用 Spark 3.3 的用户可以使用 hudi-spark3.3-bundle或 hudi-spark3-bundle。将继续支持 Spark 3.2、Spark 3.1 和 Spark 2.4。 请查看迁移指南以获取bundle更新

Spark SQL 支持改进

  • 通过调用Call Procedure支持升级、降级、引导、清理、回滚和修复。
  • 支持分析表。
  • 通过 Spark SQL 支持创建/删除/显示/刷新索引语法。

Flink 1.15 支持

Flink 1.15.x 与 Hudi 集成,编译代码时使用配置文件参数 -Pflink1.15 适配版本。 或者使用 hudi-flink1.15-bundle。Flink 1.14 和 Flink 1.13 将继续得到支持,请查看迁移指南以获取bundle更新

Flink 集成改进

  • 批处理模式读取支持数据跳过,设置 SQL 选项 metadata.enabledhoodie.metadata.index.column.stats.enable和 read.data.skipping.enabled为 true 以启用它。
  • 添加了一个基于 HMS 的 Flink 目录,目录标识符为 hudi。可以直接通过 API 实例化目录,也可以使用 CREATE CATALOG语法来创建它。指定目录选项 'mode' = 'hms'以切换到 HMS 目录。 默认情况下,目录处于 dfs模式。
  • Flink INSERT 操作支持异步Clustering,设置 SQL 选项 clustering.schedule.enabled和 clustering.async.enabled 为 true 以启用它。 启用此功能时将异步连续调度Clustering子管道,以将小文件连续合并为更大的文件。

性能改进

这个版本带来了更多的改进,使 Hudi 成为性能最好的湖存储格式。 一些显着的改进是:

  • 通过 Spark Datasource与 sql 缩小了写入的性能差距。 以前数据源写入速度更快。
  • 所有内置密钥生成器都实现了更高性能的 Spark 特定 API。
  • 将批量插入操作中的 UDF 替换为 RDD 转换以降低 serde 成本。
  • 优化了数据跳过中的列统计索引性能。

我们最近将 Hudi 与 TPC-DS 工作负载进行了基准测试。 请查看我们的博客了解更多详情。

迁移指南

在此版本中,下面列出了一些 API 和配置更新,这些更新保证了新的表格版本。 因此,最新的表版本是 5。对于旧版本的现有 Hudi 表,将自动执行一次性升级步骤。 在升级到 Hudi 0.12.0 之前,请注意以下更新。

配置更新

在此版本中,一些配置的默认值已更改。它们如下:

  • hoodie.bulkinsert.sort.mode:此配置用于确定批量插入记录的排序模式。它的默认值已从 GLOBAL_SORT 更改为 NONE,这意味着不进行排序,并且在开销方面与 spark.write.parquet() 匹配。
  • hoodie.datasource.hive_sync.partition_value_extractor:此配置用于在 Hive 同步期间提取和转换分区值。其默认值已从 SlashEncodedDayPartitionValueExtractor更改为 MultiPartKeysValueExtractor。如果您依赖之前的默认值(即没有明确设置),则需要将配置设置为 org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor。从此版本开始,如果未设置此配置并启用 Hive 同步,则将根据分区字段数以及是否启用 Hive 样式分区自动推断分区值提取器类。
  • 如果未手动设置,将从其他配置的值推断以下配置:
  • META_SYNC_BASE_FILE_FORMAT:从 org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT 推断
  • META_SYNC_ASSUME_DATE_PARTITION:从 org.apache.hudi.common.config.HoodieMetadataConfig.ASSUME_DATE_PARTITIONING 推断
  • META_SYNC_DECODE_PARTITION:从 org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING 推断
  • META_SYNC_USE_FILE_LISTING_FROM_METADATA:从 org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE 推断

API 更新

在 SparkKeyGeneratorInterface中,getRecordKeyAPI 的返回类型已从 String 更改为 UTF8String。

// Before
String getRecordKey(InternalRow row, StructType schema); 


// After
UTF8String getRecordKey(InternalRow row, StructType schema);

Fallback分区

如果分区字段值为 null,则 Hudi 具有回退机制,而不是使写入失败。 在 0.9.0 之前,__HIVE_DEFAULT_PARTITION__被用作备用分区。 在 0.9.0 之后,由于一些重构,fallback 分区更改为default分区,此默认分区不适用于某些查询引擎。 因此我们将备用分区从 0.12.0 切换到 __HIVE_DEFAULT_PARTITION__。 我们添加了一个升级步骤,如果现有的 Hudi 表有一个名为 default的分区,我们将无法升级。 用户应将此分区中的数据重写到名为 __HIVE_DEFAULT_PARTITION__分区中。 但是如果您有意将分区命名为默认分区,则可以使用配置 hoodie.skip.default.partition.validation绕过它。

Bundle更新

  • hudi-aws-bundle 从 hudi-utilities-bundle 或 hudi-spark-bundle 中提取与 aws 相关的依赖项。 为了使用 Glue 同步、Cloudwatch 指标报告器或 DynamoDB 锁提供程序等功能,用户需要提供 hudi-aws-bundle jar 以及 hudi-utilities-bundle 或 hudi-spark-bundle jar。
  • 添加了 Spark 3.3 支持; 使用 Spark 3.3 的用户可以使用 hudi-spark3.3-bundle 或 hudi-spark3-bundle(旧版包名称)。
  • Spark 3.2 将继续通过 hudi-spark3.2-bundle 支持
  • Spark 3.1 将继续通过 hudi-spark3.1-bundle 支持
  • Spark 2.4 将继续通过 hudi-spark2.4-bundle 或 hudi-spark-bundle(旧包名称)支持
  • 增加 Flink 1.15 支持; 使用 Flink 1.15 的用户可以使用 hudi-flink1.15-bundle
  • Flink 1.14 将继续通过 hudi-flink1.14-bundle 支持
  • Flink 1.13 将继续通过 hudi-flink1.13-bundle 支持

感谢

感谢参与0.12.0版本的所有贡献者,欢迎广大数据湖爱好者加入Apache Hudi社区,欢迎star & fork https://github.com/apache/hudi

以上是关于Apache Hudi - 初步了解的主要内容,如果未能解决你的问题,请参考以下文章

Apache Hudi 0.12.0版本发布

Apache Hudi 0.12.0版本发布

底层基于Apache Hudi的DLA最佳实践 | 海量低成本日志分析

初步了解网站压力测试工具

nginx的初步了解

数据湖之Hudi:Apache Hudi 快速发展