如何在 apache spark 中读取最新的分区
Posted
技术标签:
【中文标题】如何在 apache spark 中读取最新的分区【英文标题】:how to read most recent partition in apache spark 【发布时间】:2019-07-10 03:48:38 【问题描述】:我使用了包含查询的数据框
df : Dataframe =spark.sql(s"show Partitions $yourtablename")
现在分区的数量每天都在变化,因为它每天都在运行。
主要关心的是我需要获取最新的分区。
假设我获得了特定日期的随机表的分区 喜欢
year=2019/month=1/day=1
year=2019/month=1/day=10
year=2019/month=1/day=2
year=2019/month=1/day=21
year=2019/month=1/day=22
year=2019/month=1/day=23
year=2019/month=1/day=24
year=2019/month=1/day=25
year=2019/month=1/day=26
year=2019/month=2/day=27
year=2019/month=2/day=3
现在您可以看到它对分区进行排序的功能,以便在day=1
之后出现day=10
。这会产生一个问题,因为我需要获取最新的分区。
我已经设法通过使用来获取分区
val df =dff.orderby(col("partition").desc.limit(1)
但这给了我尾 -1 分区,而不是最新的分区。
如何从表中获取最新的分区,以克服 hives 对排列分区的限制?
所以假设在上面的例子中我需要拿起
year=2019/month=2/day=27
而不是
year=2019/month=2/day=3
这是表中的最后一个分区。
【问题讨论】:
我会通过使用谓词下推的 s""" 的适当查询来读取分区。 【参考方案1】:您可以从SHOW PARTITIONS
获取最大分区数
spark.sql("SHOW PARTITIONS my_database.my_table").select(max('partition)).show(false)
【讨论】:
这在 HDFS 或 S3 的情况下不起作用。 @ChrisIvan 有几种方法可以使它工作。一个例子是使用saveAsTable()
。
如果您的目标是避免为了找到最新的分区而在 S3 中昂贵地加载所有分区,这正是我要解决的问题。我通过使用 boto3 通过 S3 API 查找最新的分区键进行管理,然后在 WHERE 条件下对其进行硬编码以使用下推谓词。这很老套,但它有效。【参考方案2】:
我不会依赖位置依赖,但如果你这样做,我至少会有 year=2019/month=2/day=03.
我会通过 SQL 语句依赖分区修剪和 SQL。我不确定你是否在使用 ORC、PARQUET 等,但分区修剪应该是个不错的选择。
例如
val df = sparkSession.sql(""" select max(partition_col)
from randomtable
""")
val maxVal = df.first().getString(0) // this as sql result is a DF
另见https://mapr.com/blog/tips-and-best-practices-to-take-advantage-of-spark-2-x/
【讨论】:
我认为这样做的效果是强制spark扫描所有文件以确定最大分区日期,这在大量文件和分区的情况下相当慢。跨度>以上是关于如何在 apache spark 中读取最新的分区的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 从 S3 读取分区数据 - 分区是如何发生的?