Pyspark - 多列聚合

Posted

技术标签:

【中文标题】Pyspark - 多列聚合【英文标题】:Pyspark - Aggregation on multiple columns 【发布时间】:2016-03-27 18:28:07 【问题描述】:

我有如下数据。文件名:babynames.csv。

year    name    percent     sex
1880    John    0.081541    boy
1880    William 0.080511    boy
1880    James   0.050057    boy

我需要根据年份和性别对输入进行排序,并且我希望输出汇总如下(此输出将分配给新的 RDD)。

year    sex   avg(percentage)   count(rows)
1880    boy   0.070703         3

我不确定在 pyspark 中执行以下步骤后如何继续。在这方面需要你的帮助

testrdd = sc.textFile("babynames.csv");
rows = testrdd.map(lambda y:y.split(',')).filter(lambda x:"year" not in x[0])
aggregatedoutput = ????

【问题讨论】:

【参考方案1】:
    按照the README 的说明添加spark-csv package

    加载数据

    df = (sqlContext.read
        .format("com.databricks.spark.csv")
        .options(inferSchema="true", delimiter=";", header="true")
        .load("babynames.csv"))
    

    导入所需函数

    from pyspark.sql.functions import count, avg
    

    分组和聚合(可选使用Column.alias:

    df.groupBy("year", "sex").agg(avg("percent"), count("*"))
    

或者

percent 转换为数字 重塑为一种格式((yearsex),percentaggregateByKey 使用 pyspark.statcounter.StatCounter

【讨论】:

SparkSQL: apply aggregate functions to a list of column | Multiple Aggregate operations on the same column of a spark dataframe.

以上是关于Pyspark - 多列聚合的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:groupby 和聚合 avg 和 first 在多个列上

python + pyspark:在pyspark中进行多列比较的内部连接错误

多列上的多个聚合

Pyspark:过滤多列上的行

pyspark将单列转换为多列[重复]

PySpark 函数基于多列数据框创建自定义输出