在 PySpark DataFrame 中添加汇总输出作为新行

Posted

技术标签:

【中文标题】在 PySpark DataFrame 中添加汇总输出作为新行【英文标题】:Add output of rollup as a new row in a PySpark DataFrame 【发布时间】:2018-04-11 19:43:13 【问题描述】:

我正在将 sql 代码转换为 Pyspark。

sql 代码使用汇总来汇总每个状态的计数。

我正在尝试在 pyspark 中做同样的事情,但不知道如何获取总计数行。

我有一个包含州、城市和计数的表格,我想在州部分的末尾添加每个州的总计数。

这是一个示例输入:

State   City      Count
WA      Seattle    10
WA      Tacoma     11
MA      Boston     11
MA      Cambridge  3
MA      Quincy     5

这是我想要的输出:

State   City       Count
 WA     Seattle    10
 WA     Tacoma     11
 WA     Total      21
 MA     Boston     11
 MA     Cambridge  3
 MA     Quincy     5
 MA     Total      19

我不知道如何在状态之间添加总计数。

我确实尝试过汇总,这是我的代码:

df2=df.rollup('STATE').count()

结果显示如下:

State  Count
 WA     21
 MA     19

但我想要每个状态之后的总数。

【问题讨论】:

Spark 在 SQL 和 DataFrame API (***.com/q/37975227/9613318) 中都支持 Roolup。你有遇到什么问题吗? 是的,我确实尝试了汇总,但它最终只得到了总计数的状态。我想要状态之间的总计数行 @yokielove 如果你能分享你引用的 sql 代码会很有帮助。 【参考方案1】:

由于您希望 Total 作为 DataFrame 中的新行,一个选项是合并 groupBy() 的结果并按 ["State", "City", "Count"] 排序(以确保 "Total" 行在每个组中显示在最后):

import pyspark.sql.functions as f
df.union(
    df.groupBy("State")\
    .agg(f.sum("Count").alias("Count"))\
    .select("State", f.lit("Total").alias("City"), "Count")
).sort("State", "City", "Count").show()
#+-----+---------+-----+
#|State|     City|Count|
#+-----+---------+-----+
#|   MA|   Boston|   11|
#|   MA|Cambridge|    3|
#|   MA|   Quincy|    5|
#|   MA|    Total|   19|
#|   WA|  Seattle|   10|
#|   WA|   Tacoma|   11|
#|   WA|    Total|   21|
#+-----+---------+-----+

【讨论】:

【参考方案2】:

要么:

df.groubpBy("State", "City").rollup(count("*"))

或者只是注册表:

df.createOrReplaceTempView("df")

并应用您当前的 SQL 查询

spark.sql("...")

【讨论】:

第一个 df.groubpBy("State", "City").rollup(count("*")) 返回错误:AttributeError: 'GroupedData' object has no attribute 'rollup' 不应该是df.rollup("State", "City").agg(count("*"))吗? 它仍然无法正常工作,结果显示每个州没有总计数

以上是关于在 PySpark DataFrame 中添加汇总输出作为新行的主要内容,如果未能解决你的问题,请参考以下文章

如何将字典中的多个值添加到 PySpark Dataframe

想将key添加到pyspark dataFrame的爆炸数组中

向 pyspark Dataframe 添加新行

PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值

从另一个 DataFrame 将列添加到 Pyspark DataFrame

计算每行并在 DataFrame PySpark 中添加新列 - 更好的解决方案?