当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?

Posted

技术标签:

【中文标题】当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?【英文标题】:Why are my `binaryFiles` empty when I collect them in pyspark?当我在 pyspark 中收集它们时,为什么我的 `binaryFiles` 是空的? 【发布时间】:2016-07-07 23:06:36 【问题描述】:

我在同一文件夹中的 hdfs 上有两个 zip 文件:/user/path-to-folder-with-zips/

我将它传递给 pyspark 中的“二进制文件”:

zips = sc.binaryFiles('/user/path-to-folder-with-zips/')

我正在尝试解压缩 zip 文件并对其中的文本文件进行处理,因此我尝试查看处理 RDD 时的内容。我是这样做的:

zips_collected = zips.collect()

但是,当我这样做时,它会给出一个空列表:

>> zips_collected
[]

我知道拉链不是空的——它们有文本文件。文档here 说

每个文件被读取为单个记录,并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

我在这里做错了什么?我知道我无法查看文件的内容,因为它是压缩的,因此是二进制的。但是,我至少应该能够看到 SOMETHING。为什么它不返回任何东西?

每个 zip 文件可以有多个文件,但内容始终是这样的:

rownum|data|data|data|data|data
rownum|data|data|data|data|data
rownum|data|data|data|data|data

【问题讨论】:

我突然想到这可能是 zip 文件的 type 的问题。如何找出这是什么类型的 zip 文件? 【参考方案1】:

我假设每个 zip 文件都包含一个文本文件(多个文本文件的代码很容易更改)。在逐行处理之前,您需要先通过io.BytesIO 读取 zip 文件的内容。解决方案大致基于https://***.com/a/36511190/234233。

import io
import gzip

def zip_extract(x):
    """Extract *.gz file in memory for Spark"""
    file_obj = gzip.GzipFile(fileobj=io.BytesIO(x[1]), mode="r")
    return file_obj.read()

zip_data = sc.binaryFiles('/user/path-to-folder-with-zips/*.zip')
results = zip_data.map(zip_extract) \
                  .flatMap(lambda zip_file: zip_file.split("\n")) \
                  .map(lambda line: parse_line(line))
                  .collect()

【讨论】:

我的代码就是这样,几乎一字不差。但它会引发错误。这就是为什么我想我会运行“collect”,只是为了看看 RDD 中是否有任何东西。但它返回空列表?这就是我开始调试的方式。 每个 zip 文件中有多少个文本文件 -- 1 个或更多?另外,你能发布一个小样本的zip文件吗?如果数据是专有的,那很好——改用人工数据。 当然,这只是一堆 psv 文件。每个文件大约为 10 MB 另外,您的示例适用于 gzip 文件。我正在使用 zip 文件

以上是关于当我在 pyspark 中收集它们时,为啥我的 `binaryFiles` 是空的?的主要内容,如果未能解决你的问题,请参考以下文章

我在 pandas 中设置了我的数据类型,但是在转换为 pyspark 时,所有数据都转到了字符串

为啥我的应用程序不能以 pandas_udf 和 PySpark+Flask 开头?

从pyspark手动调用spark的垃圾收集

当我尝试从单链表中删除节点时,为啥我的 C 程序会崩溃

当我在网格/面中实现索引时,为啥它会返回 OpenGL 错误?

Heroku/Django 部署:为啥我在成功部署和静态收集时收到错误 500?