如何使用 pyspark 获取 Delta 表的所有当前分区?
Posted
技术标签:
【中文标题】如何使用 pyspark 获取 Delta 表的所有当前分区?【英文标题】:How to get all the current partitions of a Delta Table using pyspark? 【发布时间】:2021-02-22 19:52:28 【问题描述】:我正在使用 delta Lake 的 OSS 版本以及 spark 3.0.1。我当前的用例要求我发现给定增量表中的所有当前分区。
我的数据存储在'./data/raw'
中,并由sensorId
列分区(提到的路径是我的python 脚本的相对路径)。
我正在尝试使用documentation 中提到的SHOW PARTITIONS
语法。但是,我遇到了错误。
这就是我的代码的样子:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TestScript").getOrCreate()
df=spark.sql("SHOW PARTITIONS delta.`./data/raw`")
df.show()
spark-submit 命令如下所示:
spark-submit --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" test_script.py
我收到以下错误:
pyspark.sql.utils.AnalysisException: Database 'delta' not found;
我的另一个与此相关的问题是SHOW PARTITIONS
是否会给我所有的分区,或者它会限制结果。如果有限制,发现/获取增量表的所有分区的最佳方法是什么。
【问题讨论】:
你在使用 Databricks 吗?表是否保存为增量表?您提供的路径是为 delta 表保存物理 parquet 文件的路径? 问题是关于 OSS Delta,而不是 Databricks 的... @AlexOtt 我应该删除标签吗?我想既然 delta 是由 Databricks 开发的,我可以接触到相关的受众。 【参考方案1】:您可以通过多种方式查看分区。您提供的文档链接向您展示了执行此操作的方法。
如果您已将数据保存为增量表,则可以通过提供表名而不是增量路径来获取分区信息,它将返回分区信息。
spark.sql("SHOW Partitions schema.tableName").show()
您还可以使用该选项指定表的物理文件所在的路径。在您的情况下,您正在根据您的 python 脚本所在的位置传递相对路径,但这不起作用。您需要传递 S3 存储桶或您正在使用的任何存储空间的确切路径才能使其工作。
spark.sql("SHOW Partitions delta.`dbfs:/mnt/S3/tables/data/raw`").show()
回答你的最后一个问题 Show partitions 是否会给你所有的分区。答案是肯定的,但如果您检查使用 df.show() if 只会显示前 20 行。
如果您想查看表的所有行/分区,您可以对数据框进行计数,然后将其作为第二个参数传递给 show 方法。
val count = df.count()
df.show(count, truncate = False)
【讨论】:
尝试使用绝对路径和dbfs
前缀,但仍然出现相同的错误。我不确定这个解决方案是否会起作用,因为我没有在 databricks 环境中使用 delta。另外,我更喜欢基于位置路径的解决方案,因为我们的代码将在 k8 环境中运行,并且数据将在一个持久卷中,该卷将被挂载到特定位置。
使用基于您的解决方案的绝对路径将不起作用,即使您使用 dbfs 作为前缀。您需要在 databricks 环境中使用 delta 才能使其正常工作。如果您的数据在 S3 存储桶中,那么您可以将 S3 存储桶路径安装到数据块并使用它,如我在答案中所示。您将能够从 K8 env 中获取该路径,因为它将位于 S3 中。
我们没有在我们的解决方案中使用云组件。它将是本地 fs 或 nfs 挂载。因此,根据您的 cmets,我了解 databricks 环境之外的 OSS delta 无法支持上述功能。对吗?
我不知道OSS delta中是否存在这种东西,因为我没有尝试过。以上是关于如何使用 pyspark 获取 Delta 表的所有当前分区?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Zeppelin notebook 和 pyspark 中导入 Delta Lake 模块?
Pyspark-SQL 与 Pyspark 使用 Delta 格式的查询表有啥区别?
如何在 Pyspark Dataframe 中创建多列的所有成对组合?