当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?
Posted
技术标签:
【中文标题】当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?【英文标题】:Why are my `binaryFiles` empty when I collect them in pyspark?当我在 pyspark 中收集它们时,为什么我的 `binaryFiles` 是空的? 【发布时间】:2016-07-07 23:06:36 【问题描述】:我在同一文件夹中的 hdfs 上有两个 zip 文件:/user/path-to-folder-with-zips/
。
我将它传递给 pyspark 中的“二进制文件”:
zips = sc.binaryFiles('/user/path-to-folder-with-zips/')
我正在尝试解压缩 zip 文件并对其中的文本文件进行处理,因此我尝试查看处理 RDD 时的内容。我是这样做的:
zips_collected = zips.collect()
但是,当我这样做时,它会给出一个空列表:
>> zips_collected
[]
我知道拉链不是空的——它们有文本文件。文档here 说
每个文件被读取为单个记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
我在这里做错了什么?我知道我无法查看文件的内容,因为它是压缩的,因此是二进制的。但是,我至少应该能够看到 SOMETHING。为什么它不返回任何东西?
每个 zip 文件可以有多个文件,但内容始终是这样的:
rownum|data|data|data|data|data
rownum|data|data|data|data|data
rownum|data|data|data|data|data
【问题讨论】:
我突然想到这可能是 zip 文件的 type 的问题。如何找出这是什么类型的 zip 文件? 【参考方案1】:我假设每个 zip 文件都包含一个文本文件(多个文本文件的代码很容易更改)。在逐行处理之前,您需要先通过io.BytesIO
读取 zip 文件的内容。解决方案大致基于https://***.com/a/36511190/234233。
import io
import gzip
def zip_extract(x):
"""Extract *.gz file in memory for Spark"""
file_obj = gzip.GzipFile(fileobj=io.BytesIO(x[1]), mode="r")
return file_obj.read()
zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')
results = zip_data.map(zip_extract) \
.flatMap(lambda zip_file: zip_file.split("\n")) \
.map(lambda line: parse_line(line))
.collect()
【讨论】:
我的代码就是这样,几乎一字不差。但它会引发错误。这就是为什么我想我会运行“collect
”,只是为了看看 RDD 中是否有任何东西。但它返回空列表?这就是我开始调试的方式。
每个 zip 文件中有多少个文本文件 -- 1 个或更多?另外,你能发布一个小样本的zip文件吗?如果数据是专有的,那很好——改用人工数据。
当然,这只是一堆 psv 文件。每个文件大约为 10 MB
另外,您的示例适用于 gzip
文件。我正在使用 zip 文件以上是关于当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?的主要内容,如果未能解决你的问题,请参考以下文章
我在 pandas 中设置了我的数据类型,但是在转换为 pyspark 时,所有数据都转到了字符串
为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?