在 pyspark 中使用 zip

Posted

技术标签:

【中文标题】在 pyspark 中使用 zip【英文标题】:Working with zips in pyspark 【发布时间】:2016-04-05 20:13:12 【问题描述】:

我在一个目录中有 n 个 zip,我想提取其中的每一个,然后从位于 zip 中的一两个文件中提取一些数据并将其添加到图形数据库中。我已经为整个事情制作了一个连续的python脚本,但我坚持将它转换为火花。我所有的 zip 文件都在 HDFS 目录中。而且,他的图形数据库是 Neo4j。我尚未了解如何将 spark 与 neo4j 连接,但我仍停留在更初始的步骤。

我认为我的代码应该遵循这些原则。

# Names of all my zips
zip_names = ["a.zip", "b.zip", "c.zip"]

# function extract_&_populate_graphDB() returns 1 after doing all the work.
# This was done so that a closure can be applied to start the spark job.
sc.parallelize(zip_names).map(extract_&_populate_grapDB).reduce(lambda a, b: a+b)

我无法对此进行测试的是如何提取 zip 并读取其中的文件。我可以通过 sc.textFile 读取 zip,但在其上运行 take(1) 时,它返回了十六进制数据。

那么,是否可以读取 zip 文件并提取数据?或者,我应该在将数据放入 HDFS 之前提取数据吗?或者也许有其他方法可以解决这个问题?

【问题讨论】:

docs.databricks.com/spark/latest/data-sources/zip-files.html 可能会有所帮助 【参考方案1】:

更新答案*

如果您想使用 Gzip 压缩文件,您可以在配置 Spark shell 或 Spark 作业时设置一些参数,以便读取和写入压缩数据。

--conf spark.hadoop.mapred.output.compress=True \
--conf spark.hadoop.mapred.output.compression.codec=True \ 
--conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
--conf spark.hadoop.mapred.output.compression.type: BLOCK

将这些添加到您当前用于创建 shell 的 bash 脚本中(例如 pyspark),您就可以读取和写入压缩数据。

不幸的是,有no innate support of Zip files,所以你需要做更多的工作才能到达那里。

【讨论】:

以上是关于在 pyspark 中使用 zip的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark:如何在Python 3中使用pyspark

PySpark:如何在列中使用 Or 进行分组

如何在 Pyspark 中使用 Scala 函数?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

在 PySpark 中使用列条件替换空值

Pyspark 使用自定义函数