从文件系统有条件地加载分区

Posted

技术标签:

【中文标题】从文件系统有条件地加载分区【英文标题】:Conditional loading of partitions from file-system 【发布时间】:2020-08-03 17:28:47 【问题描述】:

我知道在 pySparks .load()-function 中存在关于通配符的问题,例如 here 或 here。 无论如何,我发现的所有问题/答案都没有涉及我的变体。

上下文

在 pySpark 中,我想直接从 HDFS 加载文件,因为我必须为 Spark 2.3.x 使用 databricks avro-library。我这样做是这样的:

partition_stamp = "202104"

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition=partition_stamp*") \
        .select("...")

如您所见,分区源自yyyyMMdd 格式的时间戳

问题

目前我只获得 2021 年 4 月使用的所有分区 (partition_stamp = "202104")。 但是,我需要从 2021 年 4 月开始的所有分区。

用伪代码编写,我需要一个类似这样的解决方案:

.load(f"/path/partition >= partition_stamp*")

由于实际上存在数百个分区,因此以任何需要硬编码的方式进行操作都是没有用的。

所以我的问题是:有条件文件加载功能吗?

【问题讨论】:

【参考方案1】:

据我所知,在 .load()-function 中动态处理路径只有以下选项:

*:  Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
1,2,3: Set-like inclusion of a defined set of characters -> (/path/202001,2,3/...)

因此,回答我的问题:没有用于条件文件加载的内置函数。


无论如何,我想为您提供我的解决方案:
import pandas as pd # Utilize pandas date-functions

partition_stamp = ",".join((set(
                        str(_range.year) + ":02".format(_range.month) 
                        for _range in pd.date_range(start=start_date, end=end_date, freq='D')
                 )))

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition=partition_stamp*") \
        .select("...")

这样,yyyyMM 格式的时间戳的限制是针对给定的开始日期和结束日期动态生成的,并且基于字符串的.load() 仍然可用。

【讨论】:

以上是关于从文件系统有条件地加载分区的主要内容,如果未能解决你的问题,请参考以下文章

linux文件系统详解

Linux 系统引导过程

Linux常见面试题2

如何有条件地检测操作系统并加载ZSH设置?

linux文件系统

LINUX系统重新安装