在 delta Lake 中高效读取/转换分区数据

Posted

技术标签:

【中文标题】在 delta Lake 中高效读取/转换分区数据【英文标题】:Efficient reading/transforming partitioned data in delta lake 【发布时间】:2020-11-13 19:00:41 【问题描述】:

我的数据位于 ADLS 的一个三角洲湖中,并正在通过 Databricks 读取它。数据按年份和日期划分,z 按 storeIdNum 排序,其中大约有 10 个 store Id #s,每个日期有几百万行。当我阅读它时,有时我正在阅读一个日期分区(约 2000 万行),有时我正在阅读一整月或一整年的数据以进行批处理操作。我有一个第二个小得多的表,每个日期大约有 75,000 行,它也是按 storeIdNum 排序的 z 并且我的大部分操作都涉及将较大的数据表连接到 storeIdNum 上的较小表(以及一些其他各种字段 - 例如时间窗口,较小的表是按小时汇总的,另一个表每秒都有数据点)。当我读入表格时,我加入它们并执行一系列操作(group by、window by 和 partition by 以及 lag/lead/avg/dense_rank 函数等)。

我的问题是:我应该在所有的 join、group by 和 partition by 语句中都有日期吗? 每当我读取一个日期的数据时,我总是有年份和日期读取数据的语句,因为我知道我只想从某个分区(或一年的分区)中读取,但是引用分区 col 也很重要。在 windows 和 group bus 中以提高效率,还是这是多余的?在分析/转换之后,我不会覆盖/修改我正在读取的数据,而是写入一个新表(可能在相同的列上分区),以防万一。

例如:

dfBig = spark.sql("SELECT YEAR, DATE, STORE_ID_NUM, UNIX_TS, BARCODE, CUSTNUM, .... FROM STORE_DATA_SECONDS WHERE YEAR = 2020 and DATE='2020-11-12'")
dfSmall = spark.sql("SELECT YEAR, DATE, STORE_ID_NUM, TS_HR, CUSTNUM, .... FROM STORE_DATA_HRS WHERE YEAR = 2020 and DATE='2020-11-12'")

现在,如果我加入他们,我是想在加入中包含 YEAR 和 DATE,还是应该只加入 STORE_ID_NUM(然后是我需要加入的任何时间戳字段/客户 ID 号字段)?我绝对需要 STORE_ID_NUM,但如果它只是添加另一列并使其效率更低,我可以放弃 YEAR AND DATE,因为它需要加入更多的东西。我不知道它到底是如何工作的,所以我想通过前面的连接来检查,也许我在做操作时没有使用分区,所以效率更低?谢谢!

【问题讨论】:

【参考方案1】:

delta 的关键是要很好地选择分区列,这可能需要一些试验和错误,如果你想优化响应的性能,我学到的一个技术是选择一个低基数的过滤列(你知道问题是否与时间序列有关,它将是日期,另一方面,如果是关于所有客户的报告,在这种情况下,选择您的城市可能会很方便),请记住,如果您使用 delta 每个分区表示文件结构的一个级别,其基数将是目录数。

在你的情况下,我发现按 YEAR 分区很好,但我会添加 MONTH,因为记录的数量有助于动态修剪 spark

如果表与另一个表相比非常小,您可以尝试的另一件事是使用 BRADCAST JOIN。

Broadcast Hash Join en Spark (ES)

Join Strategy Hints for SQL Queries

后一个链接解释了动态修剪如何帮助 MERGE 操作。

How to improve performance of Delta Lake MERGE INTO queries using partition pruning

【讨论】:

以上是关于在 delta Lake 中高效读取/转换分区数据的主要内容,如果未能解决你的问题,请参考以下文章

Delta Lake中CDC的实现

Delta Lake中CDC的实现

Delta Lake 提供纯 ScalaJavaPython 操作 API,和 Flink 整合更加容易

pyspark delta-lake 元存储

Apache Spark + Delta Lake 概念

如何使用Delta Lake构建批流一体数据仓库