与 groupBy 聚合后将 pyspark 数据帧保存为 csv 文件
Posted
技术标签:
【中文标题】与 groupBy 聚合后将 pyspark 数据帧保存为 csv 文件【英文标题】:Saving pyspark dataframe after being aggregated with groupBy as csv file 【发布时间】:2019-02-07 08:00:43 【问题描述】:我正在学习 pyspark,我对如何将分组数据帧保存为 csv 文件有点困惑(假设由于某些原因——例如 RAM 限制——我不想先将其转换为 Pandas 数据帧)。
对于一个可重现的例子:
import seaborn as sns
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local') \
.appName('Data cleaning') \
.getOrCreate()
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql.functions import *
mpg= sns.load_dataset('mpg')
mpg_sp = spark.createDataFrame(mpg)
mpg_grp = mpg_sp.groupBy('model_year', 'origin').avg('displacement', 'weight')
# The command below fails in the sense that it creates a folder with multiple files in it rather than a single csv file as I would expect
mpg_grp.write.csv('mpg_grp.csv')
# By applying the collect method I get a list which can not be saved as a csv file
mpg_grp1 = mpg_grp.collect()
type(mpg_grp1)
list
【问题讨论】:
【参考方案1】:上面的答案是正确的,但它的使用效果不是很好。 当然,您可以使用 repartition(1) 或 coalesce(1),但它会导致将所有数据传输到单个工作人员,并且会大大降低您的代码速度。 为了避免这种情况,我建议您在数据集中的一列上对数据进行分区。然后编写简单的代码,每个分区获取一个文件:
cols = ["$name"]
mpg_grp.repartition(cols).write.partitionBy(cols).csv("$location")
因此,数据将按您的一列在工作人员之间进行分区,并且您将在每个分区中获得一个文件(以日期为例)。
【讨论】:
显然 OP 只需要 1 个文件。但除此之外,你的解决方案很好:)【参考方案2】:Spark 是一个分布式框架。因此,多个文件中的输出是正常行为......每个工作人员都会编写它的一部分,从而产生几个小文件。
你可以使用这个命令来欺骗系统:
mpg_grp.coalesce(1).write.csv('mpg_grp.csv')
这将只写入 1 个文件(但仍位于名为“mpg_grp.csv”的文件夹中)。注意:可能会很慢。
【讨论】:
以上是关于与 groupBy 聚合后将 pyspark 数据帧保存为 csv 文件的主要内容,如果未能解决你的问题,请参考以下文章
具有聚合唯一值的pyspark dataframe groupby [重复]
pyspark:groupby 和聚合 avg 和 first 在多个列上
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?