从文件系统有条件地加载分区
Posted
技术标签:
【中文标题】从文件系统有条件地加载分区【英文标题】:Conditional loading of partitions from file-system 【发布时间】:2020-08-03 17:28:47 【问题描述】:我知道在 pySparks .load()
-function 中存在关于通配符的问题,例如 here 或 here。
无论如何,我发现的所有问题/答案都没有涉及我的变体。
上下文
在 pySpark 中,我想直接从 HDFS 加载文件,因为我必须为 Spark 2.3.x 使用 databricks avro-library。我这样做是这样的:
partition_stamp = "202104"
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition=partition_stamp*") \
.select("...")
如您所见,分区源自yyyyMMdd
格式的时间戳。
问题
目前我只获得 2021 年 4 月使用的所有分区 (partition_stamp = "202104"
)。
但是,我需要从 2021 年 4 月开始的所有分区。
用伪代码编写,我需要一个类似这样的解决方案:
.load(f"/path/partition >= partition_stamp*")
由于实际上存在数百个分区,因此以任何需要硬编码的方式进行操作都是没有用的。
所以我的问题是:有条件文件加载功能吗?
【问题讨论】:
【参考方案1】:据我所知,在 .load()
-function 中动态处理路径只有以下选项:
*: Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
1,2,3: Set-like inclusion of a defined set of characters -> (/path/202001,2,3/...)
因此,回答我的问题:没有用于条件文件加载的内置函数。
无论如何,我想为您提供我的解决方案:
import pandas as pd # Utilize pandas date-functions
partition_stamp = ",".join((set(
str(_range.year) + ":02".format(_range.month)
for _range in pd.date_range(start=start_date, end=end_date, freq='D')
)))
df = spark.read.format("com.databricks.spark.avro") \
.load(f"/path/partition=partition_stamp*") \
.select("...")
这样,yyyyMM
格式的时间戳的限制是针对给定的开始日期和结束日期动态生成的,并且基于字符串的.load()
仍然可用。
【讨论】:
以上是关于从文件系统有条件地加载分区的主要内容,如果未能解决你的问题,请参考以下文章