更改 Pyspark rdd 中 saveAsTextFile 选项中的分隔符

Posted

技术标签:

【中文标题】更改 Pyspark rdd 中 saveAsTextFile 选项中的分隔符【英文标题】:Change the delimiter in saveAsTextFile Option in Pyspark rdd 【发布时间】:2017-06-04 17:27:02 【问题描述】:

我的数据集在 HDFS 中可用。我正在阅读它并执行过滤操作。

dir = sc.textFile('/datasets/DelayedFlights.csv').filter(lambda x: 
int(x.split(',')[24]) == 1).map(lambda y: y.split(','))
The output of above operation is
[u'1763', u'2008', u'1', u'3', u'4', u'922.0', u'915', u'', u'1050', u'WN', 
u'1069', u'N630WN', u'', u'95.0', u'', u'', u'7.0', u'SAN', u'SMF', u'480', 
u'', u'12.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']
[u'1911', u'2008', u'1', u'3', u'4', u'2325.0', u'1900', u'', u'2030', 
u'WN', u'2092', u'N302SW', u'', u'90.0', u'', u'', u'265.0', u'SFO', u'SAN', 
u'447', u'', u'11.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']
[u'2651', u'2008', u'1', u'4', u'5', u'1949.0', u'1905', u'', u'1910', 
u'WN', u'1403', u'N504SW', u'', u'65.0', u'', u'', u'44.0', u'BOI', u'RNO', 
u'335', u'', u'11.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']

我想使用带有制表符分隔符的 saveAsTextFile 将上述文件保存到 HDFS 路径 谁能告诉我如何在python中将分隔符从逗号更改为制表符

【问题讨论】:

【参考方案1】:

实现此目的的一种方法是将 RDD 转换为数据帧,并以 csv 格式保存数据帧,并将分隔符选项设置为选项卡,如下所示。

rdd = spark.sparkContext.parallelize([['1763', '2008', '1', '3', '4', '922.0'], ['1763', '2008', '1', '3', '4', '922.0'], ['1763', '2008', '1', '3', '4', '922.0']])
df = spark.createDataFrame(rdd.map(lambda x: tuple(x)))
df.write.format('com.databricks.spark.csv').option("delimiter", '\t').save('/path/to/csv/file/')

如果您不想将 rdd 转换为数据帧,请按照以下 sn-p 操作。

rdd.map(lambda x: '\t'.join(x)).saveAsTextFile('test_dir/output')

建议使用 DataFrame 方法而不是上述方法。

【讨论】:

你能告诉我为什么你使用 tuple(x) 来创建数据框而不是 createDataFrame(rdd,schema) 这是因为元组的每个元素都将被视为单独的列。例如[('a1', 'b1', 'c1'), ('a2', 'b2', 'c2'), ('a3', 'b3', 'c3')] 这将创建一个包含 3 列的 DataFrame.. 我收到此错误 TypeError: 'DataFrameWriter' object is not callable 你的 SPARK 是什么版本? 我使用的是 spark 1.6.1

以上是关于更改 Pyspark rdd 中 saveAsTextFile 选项中的分隔符的主要内容,如果未能解决你的问题,请参考以下文章

当我在 AWS EMR Studio 中使用 saveAsTable 保存 PySpark DataFrame 时,它​​会保存在哪里?

映射 dict(来自 rdd)以递归方式更改 Python/PySpark 中的列名

PySpark 重新分区 RDD 元素

在 pyspark RDD 上应用地图功能

Pyspark Dataframe:无法保存为 Hive 表

我可以将 pyspark 数据框保存为哪些文件格式?