在 PySpark DataFrame 中添加汇总输出作为新行
Posted
技术标签:
【中文标题】在 PySpark DataFrame 中添加汇总输出作为新行【英文标题】:Add output of rollup as a new row in a PySpark DataFrame 【发布时间】:2018-04-11 19:43:13 【问题描述】:我正在将 sql 代码转换为 Pyspark。
sql 代码使用汇总来汇总每个状态的计数。
我正在尝试在 pyspark 中做同样的事情,但不知道如何获取总计数行。
我有一个包含州、城市和计数的表格,我想在州部分的末尾添加每个州的总计数。
这是一个示例输入:
State City Count
WA Seattle 10
WA Tacoma 11
MA Boston 11
MA Cambridge 3
MA Quincy 5
这是我想要的输出:
State City Count
WA Seattle 10
WA Tacoma 11
WA Total 21
MA Boston 11
MA Cambridge 3
MA Quincy 5
MA Total 19
我不知道如何在状态之间添加总计数。
我确实尝试过汇总,这是我的代码:
df2=df.rollup('STATE').count()
结果显示如下:
State Count
WA 21
MA 19
但我想要每个状态之后的总数。
【问题讨论】:
Spark 在 SQL 和DataFrame
API (***.com/q/37975227/9613318) 中都支持 Roolup。你有遇到什么问题吗?
是的,我确实尝试了汇总,但它最终只得到了总计数的状态。我想要状态之间的总计数行
@yokielove 如果你能分享你引用的 sql 代码会很有帮助。
【参考方案1】:
由于您希望 Total
作为 DataFrame 中的新行,一个选项是合并 groupBy()
的结果并按 ["State", "City", "Count"]
排序(以确保 "Total"
行在每个组中显示在最后):
import pyspark.sql.functions as f
df.union(
df.groupBy("State")\
.agg(f.sum("Count").alias("Count"))\
.select("State", f.lit("Total").alias("City"), "Count")
).sort("State", "City", "Count").show()
#+-----+---------+-----+
#|State| City|Count|
#+-----+---------+-----+
#| MA| Boston| 11|
#| MA|Cambridge| 3|
#| MA| Quincy| 5|
#| MA| Total| 19|
#| WA| Seattle| 10|
#| WA| Tacoma| 11|
#| WA| Total| 21|
#+-----+---------+-----+
【讨论】:
【参考方案2】:要么:
df.groubpBy("State", "City").rollup(count("*"))
或者只是注册表:
df.createOrReplaceTempView("df")
并应用您当前的 SQL 查询
spark.sql("...")
【讨论】:
第一个 df.groubpBy("State", "City").rollup(count("*")) 返回错误:AttributeError: 'GroupedData' object has no attribute 'rollup' 不应该是df.rollup("State", "City").agg(count("*"))
吗?
它仍然无法正常工作,结果显示每个州没有总计数以上是关于在 PySpark DataFrame 中添加汇总输出作为新行的主要内容,如果未能解决你的问题,请参考以下文章
如何将字典中的多个值添加到 PySpark Dataframe
想将key添加到pyspark dataFrame的爆炸数组中
PySpark 根据第二个 DataFrame 的列向一个 DataFrame 添加值