Pyspark:将df写入具有特定名称的文件,绘制df
Posted
技术标签:
【中文标题】Pyspark:将df写入具有特定名称的文件,绘制df【英文标题】:Pyspark: write df to file with specific name, plot df 【发布时间】:2017-06-19 15:01:15 【问题描述】:我正在使用最新版本的 Spark(2.1.1)。我通过 spark.read.csv 将多个 csv 文件读取到数据帧。 使用此数据框处理后,如何将其保存到具有特定名称的输出 csv 文件。
例如,有 100 个输入文件(in1.csv、in2.csv、in3.csv、...in100.csv)。 属于 in1.csv 的行应保存为 in1-result.csv。属于 in2.csv 的行应保存为 in2-result.csv 等。(默认文件名会像 part-xxxx-xxxxxx 这样不可读)
我见过 partitionBy(col) 但看起来它只能按列分区。
另一个问题是我想绘制我的数据框。 Spark 没有内置的绘图库。许多人使用 df.toPandas() 转换为 pandas 并绘制它。有没有更好的解决方案?由于我的数据非常大并且 toPandas() 会导致内存错误。我正在服务器上工作,想将绘图保存为图像而不是显示。
【问题讨论】:
How to save a spark DataFrame as csv on disk?的可能重复 我不是问如何正常保存到磁盘。我想用特定的名称保存它。前任。 in1.csv 中的行将写为 in1-result.csv。 (正是这个名字不是部分xxxx) spark 在后台使用 hadoop,除非您使用 MultipleTextOutputFormat 和字符串的 RDD 将其保存为 hadoop 文件,否则在 spark 中没有开箱即用的解决方案 当涉及到绘图时,我认为使用 toPandas 是一个不错的方法,但您需要先聚合您的 DataFrame,这样转换为 Pandas 并不是什么大问题。您宁愿不想绘制 DataFrame 中的所有行,因为它不可读。 感谢你们两位的cmets。 【参考方案1】:我建议在与输入文件相关的特定目录中编写 DataFrame 的以下解决方案:
在每个文件的循环中: 读取 csv 文件 使用withColumn
转换添加包含有关输入文件信息的新列
使用union
转换合并所有数据帧
进行必要的预处理
使用partitionBy
保存结果,通过向列提供输入文件信息,这样与同一输入文件相关的行将保存在同一输出目录中
代码可能如下所示:
all_df = None
for file in files: # where files is list of input CSV files that you want to read
df = spark.read.csv(file)
df.withColumn("input_file", file)
if all_df is None:
all_df = df
else:
all_df = all_df.union(df)
# do preprocessing
result.write.partitionBy(result.input_file).csv(outdir)
【讨论】:
非常感谢您提出这个好主意。我对您的解决方案做类似的事情。我的代码如下所示: df = df.withColumn("filename", input_file_name()) 然后 df.write.partitionBy("filename").format('csv').save("mypath") 文件名是仍然是部分 xxxx,但输出文件夹与输入匹配仍然很棒。以上是关于Pyspark:将df写入具有特定名称的文件,绘制df的主要内容,如果未能解决你的问题,请参考以下文章
Python pyspark 将 DF 写入 .csv 并存储在本地 C 盘
PySpark foreachPartition 并行写入数据库