如何在 pyspark 中读取大的 zip 文件

Posted

技术标签:

【中文标题】如何在 pyspark 中读取大的 zip 文件【英文标题】:How to read large zip files in pyspark 【发布时间】:2019-03-28 14:29:04 【问题描述】:

我在 s3 上确实有 n 个 .zip 文件,我想处理并从中提取一些数据。 zip 文件包含一个 json 文件。在 Spark 中,我们可以读取 .gz 文件,但我没有找到任何方法来读取 .zip 文件中的数据。有人可以帮助我如何使用 python 在 spark 上处理大型 zip 文件。我遇到了一些像 newAPIHadoopFile 这样的选项,但没有得到任何运气,也没有找到在 pyspark 中实现它们的方法。请注意,zip 文件大于 1G,有些也是 20G。

下面是我使用的代码:

import zipfile
import io
file_name = "s3 file path for zip file"

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles(file_name)
files_data = zips.map(zip_extract)

但由于以下原因,它失败了。我使用的实例是 r42x.large。

Exit code: 52
Stack trace: ExitCodeException exitCode=52: 
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0

【问题讨论】:

How to open/stream .zip files through Spark?的可能重复 已经看过了,还是不行。 简单地添加您尝试过的代码,以及您得到的错误。那太好了。 我和你有同样的问题。我想知道你有没有解决这个问题? 我刚刚发布了答案。我做了流式处理 zip 文件的方法。 【参考方案1】:

我确实以块的形式读取了 zip 文件的内容,并使用 spark 处理了这些块。这对我有用,并帮助我阅读大小超过 10G 的 zip 文件。下面是示例集:

max_data_length=10000
z = zipfile.ZipFile(zip_file)
data = []
counter=1
with z.open(z.infolist()[0]) as f:
    line_counter=0
    for line in f:
        # Append file contents to list
        data.append(line)
        line_counter=line_counter+1
        # Reset counters if record count hit max-data-length threshold
        # Create spark dataframes
        if not line_counter % max_data_length:          
            # Spark processing like:
            df_rdd = spark.sparkContext.parallelize(data)

            # Reset Counters and data-list
            counter=counter+1
            line_counter=0
            data= []

【讨论】:

你能详细说明一下吗?我看到控件永远不会进入 if 条件,因此数据永远不会添加到 rdd!如果数据很大并且确实进入了,则再次重置计数器,并且再次将结束数据写入rdd。你找到正确的解决方案了吗?

以上是关于如何在 pyspark 中读取大的 zip 文件的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 使用 Spark 读取巨大 Json 文件的第一行

如何使用 Pyspark 组合两个 Dstream(类似于普通 RDD 上的 .zip)

如何在不使用临时文件的情况下从 Java 中的嵌套 zip 文件中读取数据?

在 pyspark 中使用 zip

如何在 pyspark 数据框中读取 csv 文件时读取选定的列?

如何在 pyspark 中启用 csv 文件的多行读取