Pyspark:将df写入具有特定名称的文件,绘制df

Posted

技术标签:

【中文标题】Pyspark:将df写入具有特定名称的文件,绘制df【英文标题】:Pyspark: write df to file with specific name, plot df 【发布时间】:2017-06-19 15:01:15 【问题描述】:

我正在使用最新版本的 Spark(2.1.1)。我通过 spark.read.csv 将多个 csv 文件读取到数据帧。 使用此数据框处理后,如何将其保存到具有特定名称的输出 csv 文件。

例如,有 100 个输入文件(in1.csv、in2.csv、in3.csv、...in100.csv)。 属于 in1.csv 的行应保存为 in1-result.csv。属于 in2.csv 的行应保存为 in2-result.csv 等。(默认文件名会像 part-xxxx-xxxxxx 这样不可读)

我见过 partitionBy(col) 但看起来它只能按列分区。

另一个问题是我想绘制我的数据框。 Spark 没有内置的绘图库。许多人使用 df.toPandas() 转换为 pandas 并绘制它。有没有更好的解决方案?由于我的数据非常大并且 toPandas() 会导致内存错误。我正在服务器上工作,想将绘图保存为图像而不是显示。

【问题讨论】:

How to save a spark DataFrame as csv on disk?的可能重复 我不是问如何正常保存到磁盘。我想用特定的名称保存它。前任。 in1.csv 中的行将写为 in1-result.csv。 (正是这个名字不是部分xxxx) spark 在后台使用 hadoop,除非您使用 MultipleTextOutputFormat 和字符串的 RDD 将其保存为 hadoop 文件,否则在 spark 中没有开箱即用的解决方案 当涉及到绘图时,我认为使用 toPandas 是一个不错的方法,但您需要先聚合您的 DataFrame,这样转换为 Pandas 并不是什么大问题。您宁愿不想绘制 DataFrame 中的所有行,因为它不可读。 感谢你们两位的cmets。 【参考方案1】:

我建议在与输入文件相关的特定目录中编写 DataFrame 的以下解决方案:

在每个文件的循环中: 读取 csv 文件 使用withColumn 转换添加包含有关输入文件信息的新列 使用union 转换合并所有数据帧 进行必要的预处理 使用partitionBy保存结果,通过向列提供输入文件信息,这样与同一输入文件相关的行将保存在同一输出目录中

代码可能如下所示:

all_df = None
for file in files: # where files is list of input CSV files that you want to read
    df = spark.read.csv(file)
    df.withColumn("input_file", file)
    if all_df is None:
        all_df = df
    else:
        all_df = all_df.union(df)

# do preprocessing

result.write.partitionBy(result.input_file).csv(outdir)

【讨论】:

非常感谢您提出这个好主意。我对您的解决方案做类似的事情。我的代码如下所示: df = df.withColumn("filename", input_file_name()) 然后 df.write.partitionBy("filename").format('csv').save("mypath") 文件名是仍然是部分 xxxx,但输出文件夹与输入匹配仍然很棒。

以上是关于Pyspark:将df写入具有特定名称的文件,绘制df的主要内容,如果未能解决你的问题,请参考以下文章

Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘

PySpark 根据特定列重新分区

PySpark foreachPartition 并行写入数据库

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

拆分特定的 PySpark df 列并创建另一个 DF

将 pyspark Rdd 写入 csv 文件