将groupBy聚合为csv文件后保存pyspark数据帧

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将groupBy聚合为csv文件后保存pyspark数据帧相关的知识,希望对你有一定的参考价值。

我正在学习pyspark,我对如何将分组数据帧保存为csv文件感到困惑(假设由于某些原因 - 例如RAM限制 - 我不想将它首先转换为Pandas数据帧)。

有关可重复的示例:

import seaborn as sns
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder 
.master('local') 
.appName('Data cleaning') 
.getOrCreate()
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql.functions import *

mpg= sns.load_dataset('mpg')
mpg_sp = spark.createDataFrame(mpg)
mpg_grp = mpg_sp.groupBy('model_year', 'origin').avg('displacement', 'weight')

# The command below fails in the sense that it creates a folder with multiple  files in it rather than a single csv file as I would expect

mpg_grp.write.csv('mpg_grp.csv')

# By applying the collect method I get a list which can not be saved as a csv file

mpg_grp1 = mpg_grp.collect()
type(mpg_grp1)
list
答案

Spark是一个分布式框架。因此,几个文件中的输出是正常行为......每个工作人员都会编写它的部分,从而产生几个小文件。

您可以使用此命令欺骗系统:

mpg_grp.coalesce(1).write.csv('mpg_grp.csv')

这将只写1个文件(但仍然在名为'mpg_grp.csv'的文件夹中)。 警告:可能很慢。

另一答案

以上答案是正确的,但其使用效果不是很好。 当然,您可以使用重新分区(1)或合并(1),但这会导致将所有数据传输给单个工作人员,并且会大大减慢代码速度。 为了避免这种情况,我建议您在数据集中的某个列上对数据进行分区。然后编写简单的代码以获得每个分区一个文件:

cols = ["$name"]
mpg_grp.repartition(cols).write.partitionBy(cols).csv("$location")

因此,数据将按照您的某个列在工作程序之间进行分区,并且每个分区只能获得一个文件(以日期为例)。

以上是关于将groupBy聚合为csv文件后保存pyspark数据帧的主要内容,如果未能解决你的问题,请参考以下文章

将 pandas groupby 对象保存到 csv 文件中

将 spark 数据帧聚合转换为 SQL 查询; window、groupby 的问题,以及如何聚合?

Parquet 文件上 groupby 的最佳实践

使用 pandas GroupBy 和时间序列重采样的平均聚合

CSV文件保存数字为文本形式后再次打开后数字任然显示为科学计数法的问题?

csv文件,excel保存后再打开乱码。