pyspark delta湖优化 - 无法解析SQL

Posted

技术标签:

【中文标题】pyspark delta湖优化 - 无法解析SQL【英文标题】:pyspark delta lake optimize - fails to parse SQL 【发布时间】:2020-08-28 14:44:54 【问题描述】:

我有一个使用 spark 3.x 和 delta 0.7.x 创建的 delta 表:

data = spark.range(0, 5)
data.write.format("delta").mode("overwrite").save("tmp/delta-table")
# add some more files
data = spark.range(20, 100)
data.write.format("delta").mode("append").save("tmp/delta-table")

df = spark.read.format("delta").load("tmp/delta-table")
df.show()

现在日志中生成了相当多的文件(很多时候parquet文件太小了)。

%ls tmp/delta-table

我想压缩它们:

df.createGlobalTempView("my_delta_table")
spark.sql("OPTIMIZE my_delta_table ZORDER BY (id)")

失败:

ParseException: 
mismatched input 'OPTIMIZE' expecting '(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'(line 1, pos 0)

== SQL ==
OPTIMIZE my_delta_table ZORDER BY (id)
^^^

问题:

    如何在不使查询失败的情况下使其正常工作(优化) 有没有比调用基于文本的 SQL 更原生的 API?

注意:

spark is started like this:

import pyspark
from pyspark.sql import SparkSession

spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

【问题讨论】:

【参考方案1】:

OPTIMIZE 在 OSS Delta Lake 中不可用。如果您想压缩文件,可以按照Compact files 部分中的说明进行操作。如果您想使用ZORDER,目前您需要使用Databricks Runtime。

-- 编辑--

但它seems under development。

【讨论】:

但是:docs.delta.io/latest/delta-utility.htmlOPTIMIZE 列为支持?甚至:spark.sql("OPTIMIZE my_delta_table") 失败。但是,github.com/delta-io/delta/issues/368 看起来您的答案到今天仍然是正确的。 确实,已确认。它没有被实施。延伸阅读材料:github.com/databricks/tech-talks/tree/master/…和youtube.com/watch?v=u1VfOiHVeMI df.repartition(10, col("foo"), col("bar)).sortWithinPartitions,也可能是repartitionByRange,在 OSS 中可能是一个很好的近似值。【参考方案2】:

如果您在本地运行 Delta,则意味着您必须使用 OSS Delta Lake。优化命令仅适用于 Databricks Delta Lake。要在 OSS 中进行文件压缩,您可以执行以下操作 - https://docs.delta.io/latest/best-practices.html#compact-files

【讨论】:

我正在尝试在 delta 表(流式接收器)上运行压缩,其中 spark 作业每 5 分钟在该表上执行 MERGE。在对表执行压缩时,如果在同一个表上发生更新,它是如何工作的?我们应该等到压缩成功执行后再恢复更新还是 spark 可以管理它?此外,是否有任何指南可用于考虑重新分区号?我的意思是,我有 1000 个小文件,最好重新分区到 10 甚至 5?

以上是关于pyspark delta湖优化 - 无法解析SQL的主要内容,如果未能解决你的问题,请参考以下文章

databricks delta 格式文件无法读取

如何在 Zeppelin notebook 和 pyspark 中导入 Delta Lake 模块?

“数据湖三剑客”Hudi、Delta Lake和Iceberg 深度对比

数据湖06:Delta Lake原理和功能概述

数据湖06:Delta Lake原理和功能概述

从Delta 2.0开始聊聊我们需要怎样的数据湖