在 Spark 2.0 中加载压缩的 gzipped csv 文件
Posted
技术标签:
【中文标题】在 Spark 2.0 中加载压缩的 gzipped csv 文件【英文标题】:Loading compressed gzipped csv file in Spark 2.0 【发布时间】:2016-11-02 10:37:14 【问题描述】:如何在 Spark 2.0 上的 Pyspark 中加载 gzip 压缩的 csv 文件?
我知道一个未压缩的csv文件可以如下加载:
spark.read.format("csv").option("header",
"true").load("myfile.csv")
或
spark.read.option("header", "true").csv("myfile.csv")
【问题讨论】:
小心 gzip 压缩的 CSV 文件——每个文件都有 1 个分区,因为它们不可拆分。有关更多信息,请参阅此问题/答案:***.com/questions/40336485/… 请注意,这会导致性能问题,不知何故 gzip 文件是不可分割的,并且可能导致在单个集群中执行!以下是一些您可能想要参考的主题snappishproductions.com/2015/09/28/…***.com/questions/40492967/… 【参考方案1】:我刚刚发现以下内容适用于压缩的 csv 文件:
spark.read.option("header", "true").csv("myfile.csv")
【讨论】:
您是否尝试过使用多个csv.gzip
文件的解决方案?如果这行得通,那就太棒了。
您可以使用*
通配符 - df = spark.read.option("header", "true").csv("some_path/*.gz")
。它也适用于多个文件夹-df = spark.read.option("header", "true").csv("some_path/*/*.gz")
【参考方案2】:
您可以使用spark.sparkContext.textFile("file.gz")
文件扩展名应为.gz
【讨论】:
这会产生一个 rdd,而不是一个 DataFrame。是否有直接读入 DataFrame 而不必将 rdd 转换为 DataFrame 的方法? 其实没关系,以下适用于 gzip 压缩的 csv 文件:spark.read.option("header", "true").csv("myfile.csv")
感谢您的回复.. @Shankar 然而,这个选项只给我 gz 文件中的文件名,而不是那个文件的内容
编辑:我必须用适当的小写来更正扩展名。它是大写的。谢谢。【参考方案3】:
我不确定在撰写这里的答案和我提出这个问题时这是否发生了变化,但我想插入我的发现,以供我自己和其他遇到同样问题的人将来参考。我正在将 GZIP 压缩的 CSV 文件加载到 Spark 版本 2.4.7 和 python 版本 3.7.4 上的 PySpark DataFrame 中,在 Google 的托管 Spark-As-A-Service 产品(即“Dataproc”)中。如果您想进一步研究规范,则底层 Dataproc 映像版本是 1.5-debian10。
我的问题是如果所有输入仍然是乱码,我无法成功读取 CSV。我可以通过更改文件名的结尾来进行一个小调整,使文件后缀为.gz
,然后一切正常。这是重现问题的代码。
# This is a shell script to get a dummy file created with 2 different endings
echo 'foo,bar,baz' > test.csv
gzip test.csv
# So now there are 2 files with 2 endings
cp test.csv.gz test_csv
然后我可以运行 pyspark 作业甚至是交互式 pyspark 会话(如下图所示),然后验证 spark 不会智能地检测文件类型,因为它会查看文件名并根据文件名解释文件类型.
$ pyspark
Python 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ `_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.7
/_/
Using Python version 3.7.4 (default, Aug 13 2019 20:35:49)
SparkSession available as 'spark'.
>>> filename_noend = 'test_csv'
>>> filename_end = 'test.csv.gz'
>>> schema = 'field1 string,field2 string,field3 string'
>>> df_noend = spark.read.csv(path=filename_noend, schema=schema, header=False)
>>> df_noend.show()
+--------------------+-------------+------+
| field1| field2|field3|
+--------------------+-------------+------+
���`test.cs...|�*.�+T+
| null|
+--------------------+-------------+------+
>>> df_end = spark.read.csv(path=filename_end, schema=schema, header=False)
>>> df_end.show()
+------+------+------+
|field1|field2|field3|
+------+------+------+
| foo| bar| baz|
+------+------+------+
>>> exit()
遗憾的是,没有办法指定 compression='gzip'
之类的东西。因此,以.gz
结尾保存您的 gzip 压缩文件,您就可以开始了!
【讨论】:
以上是关于在 Spark 2.0 中加载压缩的 gzipped csv 文件的主要内容,如果未能解决你的问题,请参考以下文章
无法在面向 .Net Standard 2.0 的项目中加载 Revit API