如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?
Posted
技术标签:
【中文标题】如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?【英文标题】:How to append to a csv file using df.write.csv in pyspark? 【发布时间】:2016-12-19 07:29:36 【问题描述】:我正在尝试使用 df.write.csv
将数据附加到我的 csv 文件中。这是我在关注 spark 文档 http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 后所做的:
from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append'
执行上面的代码给我错误:
NameError: name 'append' not defined
没有附加,错误:
路径已经存在。
【问题讨论】:
有sqlcsvA.csv调用的文件吗? 是的,输出被复制到sqlcsvA.csv
文件。
你能从代码中删除并重新创建这个文件吗?
您是否要求在代码中添加删除选项,然后每次程序运行时都应创建一个新文件?
【参考方案1】:
df.write.save(path='csv', format='csv', mode='append', sep='\t')
【讨论】:
这再次将输出拆分为不同的文件。它被分区了。 在写入之前包含.coalesce(1)
,它会阻止分区,但不确定结果是否会被追加! df.coalesce(1).write.save(path='csv', format='csv', mode='append', sep='\t')
谢谢。这将所有内容都集中到一个文件中。【参考方案2】:
来自文档: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 从 v1.4 开始
csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)
例如
from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")
如果您想编写单个文件,您可以在其中任何一行上使用 coalesce 或 repartition
。哪一行都没有关系,因为数据帧只是一个 DAG 执行,在写入 csv 之前不会执行任何操作。 repartition
& coalesce
有效地使用相同的代码,但合并只能减少分区数量,repartition
也可以增加它们。为简单起见,我会坚持使用repartition
。
例如
df1 = sqlContext.createDataFrame(query1).repartition(1)
或
df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")
我认为文档中的示例不是很好,它们没有显示使用路径以外的参数的示例。
参考你尝试过的两件事:
(append)
为此,需要有一个名为 append 的字符串变量,其中包含值“append”。 DataFrameWriter 库中没有名为 append 的字符串常量。 即你可以在你的代码中更早地添加它,然后它就可以工作了。 append = "追加"
('mode=append')
为此,csv 方法必须解析出mode=append
字符串以获取模式的值,当您可以只拥有一个值恰好为“append”或“的参数时,这将是额外的工作。覆盖”需要提取。 None 是特例,Python 内置,不是 pyspark 特有的。
另一方面,我建议尽可能使用命名参数。 例如
csv(path="/path/to/file.csv", mode="append")
而不是位置参数
csv("/path/to/file.csv", "append")
它更清晰,有助于理解。
【讨论】:
【参考方案3】:我不了解 Python,但在 Scala 和 Java 中可以通过以下方式设置保存模式:
df.write.mode("append").csv("pathToFile")
我认为它在 Python 中应该是类似的。 This 可能会有所帮助。
【讨论】:
我试过你在python中说的。但是我的输出的每一行都被复制到一个名为sqlcsvA.csv
的文件夹中的单独的csv 文件中。它们不会被复制到一个单独的 csv 文件中。
@kaks,看来您必须手动合并这些文件。看看这个question。例如,人们在 Java 中使用FileUtil.copyMerge。
@kaks,请注意,如果您读回结果(在 Spark 中),这些文件将被合并,并且您拥有一个包含该目录中所有文件的数据的 DataFrame。
不需要手动合并,写的时候用.repartition(1)
即可。当您将文件读回数据框时,从技术上讲,它不会合并它们,因为数据框分布在集群中。每个文件将是数据框分区的基础。所以从某种意义上说,你确实有一个数据框,但它仍然存在于许多基础部分中。以上是关于如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章
即使使用 PySpark 存在表,如何写入 Microsoft SQL Server 表
使用 pySpark 将 DataFrame 写入 mysql 表