如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?

Posted

技术标签:

【中文标题】如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?【英文标题】:How to append to a csv file using df.write.csv in pyspark? 【发布时间】:2016-12-19 07:29:36 【问题描述】:

我正在尝试使用 df.write.csv 将数据附加到我的 csv 文件中。这是我在关注 spark 文档 http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 后所做的:

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append'

执行上面的代码给我错误:

NameError: name 'append' not defined

没有附加,错误:

路径已经存在。

【问题讨论】:

有sqlcsvA.csv调用的文件吗? 是的,输出被复制到sqlcsvA.csv文件。 你能从代码中删除并重新创建这个文件吗? 您是否要求在代码中添加删除选项,然后每次程序运行时都应创建一个新文件? 【参考方案1】:
df.write.save(path='csv', format='csv', mode='append', sep='\t')

【讨论】:

这再次将输出拆分为不同的文件。它被分区了。 在写入之前包含.coalesce(1),它会阻止分区,但不确定结果是否会被追加! df.coalesce(1).write.save(path='csv', format='csv', mode='append', sep='\t') 谢谢。这将所有内容都集中到一个文件中。【参考方案2】:

来自文档: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 从 v1.4 开始

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

例如

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

如果您想编写单个文件,您可以在其中任何一行上使用 coalesce 或 repartition。哪一行都没有关系,因为数据帧只是一个 DAG 执行,在写入 csv 之前不会执行任何操作。 repartition & coalesce 有效地使用相同的代码,但合并只能减少分区数量,repartition 也可以增加它们。为简单起见,我会坚持使用repartition

例如

df1 = sqlContext.createDataFrame(query1).repartition(1)

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

我认为文档中的示例不是很好,它们没有显示使用路径以外的参数的示例。

参考你尝试过的两件事:

(append)

为此,需要有一个名为 append 的字符串变量,其中包含值“append”。 DataFrameWriter 库中没有名为 append 的字符串常量。 即你可以在你的代码中更早地添加它,然后它就可以工作了。 append = "追加"

('mode=append')

为此,csv 方法必须解析出mode=append 字符串以获取模式的值,当您可以只拥有一个值恰好为“append”或“的参数时,这将是额外的工作。覆盖”需要提取。 None 是特例,Python 内置,不是 pyspark 特有的。

另一方面,我建议尽可能使用命名参数。 例如

csv(path="/path/to/file.csv", mode="append")

而不是位置参数

csv("/path/to/file.csv", "append")

它更清晰,有助于理解。

【讨论】:

【参考方案3】:

我不了解 Python,但在 Scala 和 Java 中可以通过以下方式设置保存模式:

df.write.mode("append").csv("pathToFile")

我认为它在 Python 中应该是类似的。 This 可能会有所帮助。

【讨论】:

我试过你在python中说的。但是我的输出的每一行都被复制到一个名为sqlcsvA.csv 的文件夹中的单独的csv 文件中。它们不会被复制到一个单独的 csv 文件中。 @kaks,看来您必须手动合并这些文件。看看这个question。例如,人们在 Java 中使用FileUtil.copyMerge。 @kaks,请注意,如果您读回结果(在 Spark 中),这些文件将被合并,并且您拥有一个包含该目录中所有文件的数据的 DataFrame。 不需要手动合并,写的时候用.repartition(1)即可。当您将文件读回数据框时,从技术上讲,它不会合并它们,因为数据框分布在集群中。每个文件将是数据框分区的基础。所以从某种意义上说,你确实有一个数据框,但它仍然存在于许多基础部分中。

以上是关于如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 不允许我创建存储桶

使用 pyspark(查询)从 Hadoop 中删除文件

即使使用 PySpark 存在表,如何写入 Microsoft SQL Server 表

使用 pySpark 将 DataFrame 写入 mysql 表

在 PySpark 中将数据帧写入 CSV 后重命名文件 [重复]

pyspark write.parquet() 创建一个文件夹而不是 parquet 文件