在 Spark 2.0 中加载压缩的 gzipped csv 文件

Posted

技术标签:

【中文标题】在 Spark 2.0 中加载压缩的 gzipped csv 文件【英文标题】:Loading compressed gzipped csv file in Spark 2.0 【发布时间】:2016-11-02 10:37:14 【问题描述】:

如何在 Spark 2.0 上的 Pyspark 中加载 gzip 压缩的 csv 文件?

我知道一个未压缩的csv文件可以如下加载:

spark.read.format("csv").option("header",          
                                "true").load("myfile.csv")

spark.read.option("header", "true").csv("myfile.csv")

【问题讨论】:

小心 gzip 压缩的 CSV 文件——每个文件都有 1 个分区,因为它们不可拆分。有关更多信息,请参阅此问题/答案:***.com/questions/40336485/… 请注意,这会导致性能问题,不知何故 gzip 文件是不可分割的,并且可能导致在单个集群中执行!以下是一些您可能想要参考的主题snappishproductions.com/2015/09/28/…***.com/questions/40492967/… 【参考方案1】:

我刚刚发现以下内容适用于压缩的 csv 文件:

spark.read.option("header", "true").csv("myfile.csv")

【讨论】:

您是否尝试过使用多个csv.gzip 文件的解决方案?如果这行得通,那就太棒了。 您可以使用* 通配符 - df = spark.read.option("header", "true").csv("some_path/*.gz")。它也适用于多个文件夹-df = spark.read.option("header", "true").csv("some_path/*/*.gz")【参考方案2】:

您可以使用spark.sparkContext.textFile("file.gz")

文件扩展名应为.gz

【讨论】:

这会产生一个 rdd,而不是一个 DataFrame。是否有直接读入 DataFrame 而不必将 rdd 转换为 DataFrame 的方法? 其实没关系,以下适用于 gzip 压缩的 csv 文件:spark.read.option("header", "true").csv("myfile.csv") 感谢您的回复.. @Shankar 然而,这个选项只给我 gz 文件中的文件名,而不是那个文件的内容 编辑:我必须用适当的小写来更正扩展名。它是大写的。谢谢。【参考方案3】:

我不确定在撰写这里的答案和我提出这个问题时这是否发生了变化,但我想插入我的发现,以供我自己和其他遇到同样问题的人将来参考。我正在将 GZIP 压缩的 CSV 文件加载到 Spark 版本 2.4.7 和 python 版本 3.7.4 上的 PySpark DataFrame 中,在 Google 的托管 Spark-As-A-Service 产品(即“Dataproc”)中。如果您想进一步研究规范,则底层 Dataproc 映像版本是 1.5-debian10。

我的问题是如果所有输入仍然是乱码,我无法成功读取 CSV。我可以通过更改文件名的结尾来进行一个小调整,使文件后缀为.gz,然后一切正常。这是重现问题的代码。

# This is a shell script to get a dummy file created with 2 different endings
echo 'foo,bar,baz' > test.csv
gzip test.csv
# So now there are 2 files with 2 endings
cp test.csv.gz test_csv

然后我可以运行 pyspark 作业甚至是交互式 pyspark 会话(如下图所示),然后验证 spark 不会智能地检测文件类型,因为它会查看文件名并根据文件名解释文件类型.

$ pyspark
Python 3.7.4 (default, Aug 13 2019, 20:35:49) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.7.4 (default, Aug 13 2019 20:35:49)
SparkSession available as 'spark'.
>>> filename_noend = 'test_csv'
>>> filename_end = 'test.csv.gz'
>>> schema = 'field1 string,field2 string,field3 string'
>>> df_noend = spark.read.csv(path=filename_noend, schema=schema, header=False)
>>> df_noend.show()
+--------------------+-------------+------+
|              field1|       field2|field3|
+--------------------+-------------+------+
���`test.cs...|�*.�+T+
                      |  null|
+--------------------+-------------+------+

>>> df_end = spark.read.csv(path=filename_end, schema=schema, header=False)
>>> df_end.show()
+------+------+------+
|field1|field2|field3|
+------+------+------+
|   foo|   bar|   baz|
+------+------+------+
>>> exit()

遗憾的是,没有办法指定 compression='gzip' 之类的东西。因此,以.gz 结尾保存您的 gzip 压缩文件,您就可以开始了!

【讨论】:

以上是关于在 Spark 2.0 中加载压缩的 gzipped csv 文件的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中加载非标准格式的 CSV

无法在面向 .Net Standard 2.0 的项目中加载 Revit API

Spark 1.6 在数据帧保持分区字段中加载特定分区

在非 Spark 环境中加载 pyspark ML 模型

使用 sparklyr 时无法在本地 Spark 连接中加载 .csv 数据

如何在 Spark Scala 作业中加载和写入属性文件?