如何在 apache spark 中读取最新的分区

Posted

技术标签:

【中文标题】如何在 apache spark 中读取最新的分区【英文标题】:how to read most recent partition in apache spark 【发布时间】:2019-07-10 03:48:38 【问题描述】:

我使用了包含查询的数据框

df : Dataframe =spark.sql(s"show Partitions $yourtablename")

现在分区的数量每天都在变化,因为它每天都在运行。

主要关心的是我需要获取最新的分区。

假设我获得了特定日期的随机表的分区 喜欢

year=2019/month=1/day=1
year=2019/month=1/day=10
year=2019/month=1/day=2
year=2019/month=1/day=21
year=2019/month=1/day=22
year=2019/month=1/day=23
year=2019/month=1/day=24
year=2019/month=1/day=25
year=2019/month=1/day=26
year=2019/month=2/day=27
year=2019/month=2/day=3

现在您可以看到它对分区进行排序的功能,以便在day=1 之后出现day=10。这会产生一个问题,因为我需要获取最新的分区。

我已经设法通过使用来获取分区

val df =dff.orderby(col("partition").desc.limit(1)

但这给了我尾 -1 分区,而不是最新的分区。

如何从表中获取最新的分区,以克服 hives 对排列分区的限制?

所以假设在上面的例子中我需要拿起

 year=2019/month=2/day=27

而不是

year=2019/month=2/day=3

这是表中的最后一个分区。

【问题讨论】:

我会通过使用谓词下推的 s""" 的适当查询来读取分区。 【参考方案1】:

您可以从SHOW PARTITIONS获取最大分区数

spark.sql("SHOW PARTITIONS my_database.my_table").select(max('partition)).show(false)

【讨论】:

这在 HDFS 或 S3 的情况下不起作用。 @ChrisIvan 有几种方法可以使它工作。一个例子是使用saveAsTable() 如果您的目标是避免为了找到最新的分区而在 S3 中昂贵地加载所有分区,这正是我要解决的问题。我通过使用 boto3 通过 S3 API 查找最新的分区键进行管理,然后在 WHERE 条件下对其进行硬编码以使用下推谓词。这很老套,但它有效。【参考方案2】:

我不会依赖位置依赖,但如果你这样做,我至少会有 year=2019/month=2/day=03.

我会通过 SQL 语句依赖分区修剪和 SQL。我不确定你是否在使用 ORC、PARQUET 等,但分区修剪应该是个不错的选择。

例如

 val df = sparkSession.sql(""" select max(partition_col)
                                 from randomtable 
                           """)

 val maxVal = df.first().getString(0) // this as sql result is a DF

另见https://mapr.com/blog/tips-and-best-practices-to-take-advantage-of-spark-2-x/

【讨论】:

我认为这样做的效果是强制spark扫描所有文件以确定最大分区日期,这在大量文件和分区的情况下相当慢。跨度>

以上是关于如何在 apache spark 中读取最新的分区的主要内容,如果未能解决你的问题,请参考以下文章

如何在使用 Spark 读取时将数据分配到 X 分区?

Spark - 从 S3 读取分区数据 - 分区是如何发生的?

源码级解读如何解决Spark-sql读取hive分区表执行效率低问题

获取分区镶木地板数据帧的最新模式

从 Spark 替换 hive 分区

Spark读取HDFS数据分区参考