将多个 S3 文件夹/路径读入 PySpark

Posted

技术标签:

【中文标题】将多个 S3 文件夹/路径读入 PySpark【英文标题】:Reading Multiple S3 Folders / Paths Into PySpark 【发布时间】:2017-09-15 13:03:08 【问题描述】:

我正在使用 PySpark 进行大数据分析。我可以使用以下命令导入存储在特定存储桶的特定文件夹中的所有 CSV 文件:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('file:///home/path/datafolder/data2014/*.csv')

(其中 * 类似于通配符)

我遇到的问题如下:

    如果我想对 2014 年和 2015 年的数据进行分析,即文件 1 是 .load('file:///home/path/SFweather/data2014/*.csv'),文件 2 是 .load('file:///home/path/SFweather/data2015/*.csv'),文件 3 是 .load('file:///home/path/NYCweather/data2014/*.csv'),文件 4 是 .load('file:///home/path/NYCweather/data2015/*.csv'),该怎么办?如何同时导入多个路径以获取一个数据帧?我是否需要将它们全部单独存储为数据框,然后在 PySpark 中将它们连接在一起? (您可以假设它们所有的 CSV 都具有相同的架构) 假设现在是 2014 年 11 月。如果我想再次运行分析,但在“最新数据”上运行,例如dec14 什么时候是 2014 年 12 月?例如,我想在 12 月 14 日加载文件 2:.load('file:///home/path/datafolder/data2014/dec14/*.csv'),并使用此文件:.load('file:///home/path/datafolder/data2014/nov14/*.csv') 进行原始分析。有没有办法安排 Jupyter 笔记本(或类似笔记本)更新加载路径并导入最新运行(在这种情况下,“nov14”将被“dec14”替换,然后“jan15”等)。

我查看了之前的问题,但由于这是 AWS / PySpark 集成特定的问题,因此无法找到答案。

提前感谢您的帮助!

[背景:我已经获得了来自不同团队的许多 S3 存储桶的访问权限,这些存储桶包含各种大数据集。将其复制到我的 S3 存储桶,然后构建一个 Jupyter 笔记本似乎比直接从存储桶中提取数据并在其上构建模型/表/等并将处理后的输出保存到数据库中的工作要多得多。因此,我发布了上面的问题。如果我的想法完全错误,请阻止我! :)]

【问题讨论】:

【参考方案1】:

只要文件格式相同,您就可以使用通配符读取多个路径。

在你的例子中:

.load('file:///home/path/SFweather/data2014/*.csv')
.load('file:///home/path/SFweather/data2015/*.csv')
.load('file:///home/path/NYCweather/data2014/*.csv')
.load('file:///home/path/NYCweather/data2015/*.csv')

您可以将上面的 4 个加载语句替换为以下路径,以一次将所有 csv 读取到一个数据帧:

.load('file:///home/path/*/*/*.csv')

如果您想更具体以避免读取某些文件/文件夹,您可以执行以下操作:

.load('file:///home/path/[SF|NYC]weather/data201[4|5]/*.csv')

【讨论】:

感谢您的快速回复。 1.我只是按照上面的第一个示例附加另一个 .load ?我避免使用类似 path/*/*/*.csv 的原因是因为数据很大(请参阅我关于“数据运行”的第二个问题并且只捕获最新运行)。 2. 是否有条件限制,即如果这些是我想要的文件夹名称,我可以做类似 [SF|NYC|LON]weather 甚至只是 [SF|NYC|LON] 之类的事情。我对“避免读取文件/文件夹”的评论有点失望,但这个例子似乎包括了它。可能是我误会了,所以提前为此道歉。 1.我创建的每个加载语句都将替换您的 4 个加载语句。 2. 是的,您可以添加任意数量的条件 [1|2|3|4|...],spark 只会读取匹配的文件。 太棒了!认为你为我节省了很多额外的工作和成本。 :) 会尝试一下,如果我遇到麻烦,请告诉你。再次感谢! 此命令适用于 CSV 文件,但我无法在 .txt.gz 文件上复制该命令。是否有命令可以帮助我加载 .txt.gz 文件的多个路径、推断架构/文件有标题并让我执行类似的分析? 上述解决方案有效。一个问题:如果我有一个 S3 子文件夹,其中包含多个记录“数据运行”的文件夹,并且文件夹的标题是一个数字,我如何只选择前 3 个最高的数字(这样所有的“数据运行”不是进口的)。例如,子文件夹包含文件夹 1000、1005、1050、1101 和 1060。我只想选择 1050、1101 和 1060(最高 3 个数字)。这有什么诀窍吗?

以上是关于将多个 S3 文件夹/路径读入 PySpark的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?

Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在

如何将 S3 部分文件读入 pandas 数据框

在s3中使用pyspark合并多个小json文件[重复]

加载多个文件并且缺少一个文件时,PySpark 作业失败 [重复]

通过 pyspark 加载文件名中包含冒号的 Amazon S3 文件