PySpark 汇总数据框
Posted
技术标签:
【中文标题】PySpark 汇总数据框【英文标题】:PySpark rolling up a dataframe 【发布时间】:2019-09-19 21:21:19 【问题描述】:我有以下数据框-
>>> my_df.show(3)
+------------+---------+-------+--------------+
| user_id| address| type|count| country|
+------------+---------+-------+-----+--------+
| ABC123| yyy,USA| animal| 2| USA|
| ABC123| xxx,USA| animal| 3| USA|
| qwerty| 55A,AUS| human| 3| AUS|
| ABC123| zzz,RSA| animal| 4| RSA|
+------------+---------+-------+--------------+
如何汇总此数据框以获得以下结果-
>>> new_df.show(3)
+------------+---------+-------+--------------+
| user_id| address| type|count| country|
+------------+---------+-------+-----+--------+
| qwerty| 55A,AUS| human| 3| AUS|
| ABC123| xxx,USA| animal| 5| USA|
+------------+---------+-------+--------------+
对于给定的user_id
:
-
获取计数总和最高的
country
对于第 1 步得到的country
,获取计数最高的address
我猜我必须将 my_df
拆分为 2 个不同的数据帧,并分别获取 country
和 address
。但我并不完全知道它的语法。感谢您的帮助。谢谢。
【问题讨论】:
我会尝试窗口函数 -.select
中的 first
,然后是 .groupby
和 agg
,按 user_id 和您想从 first
获取的 2 列。这可能会有所帮助:***.com/questions/35142216/…
我不太明白那个答案..
请看下面我的回答。
【参考方案1】:
我的意思是这样的:
>>> import pandas as pd
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.window import *
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName('abc').getOrCreate()
>>> data = "user_id": ["ABC123", "ABC123", "qwerty", "ABC123"], "address": ["yyy,USA", "xxx,USA", "55A,AUS", "zzz,RSA"], "type": ["animal", "animal", "human", "animal"], "count": [2,3,3,4], "country": ["USA", "USA", "AUS", "RSA"]
>>> df = pd.DataFrame(data=data)
>>> df_pyspark = spark.createDataFrame(df)
>>> w = Window().partitionBy("user_id", "country").orderBy((col("count").desc()))
>>> w2 = Window().partitionBy("user_id").orderBy(col("sum_country").desc())
>>> df_pyspark.select("user_id", "address", "type", "count", "country", sum("count").over(w).alias("sum_country")).select("user_id", first("country").over(w2).alias("top_country"), first("address").over(w).alias("top_address"), "country").where(col("top_country")==col("country")).distinct().show()
+-------+-----------+-----------+-------+
|user_id|top_country|top_address|country|
+-------+-----------+-----------+-------+
| qwerty| AUS| 55A,AUS| AUS|
| ABC123| USA| xxx,USA| USA|
+-------+-----------+-----------+-------+
您可以添加类型、计数等,具体取决于您希望使用的逻辑 - 您可以执行与 top_address
相同的操作(即 first
函数),也可以 groupBy
和agg
【讨论】:
这很好用。虽然,当我添加type
和 count
列时,df_pyspark.count()
与我没有这些列时不同。我做了df_pyspark.select("user_id", "address", "type", "count", "country", sum("count").over(w).alias("sum_country")).select("user_id", first("country").over(w2).alias("top_country"), first("address").over(w).alias("top_address"), "country", "type", col("sum_country").alias("count")).where(col("top_country")==col("country")).distinct().show()
。我该怎么办?
另外,type
对于给定的user_id
将始终保持不变。
嗯,这应该可以正常工作。所以上面,如果你用.count()
替换.show()
应该返回2,它对你显示不同吗?
我没听懂你。顺便说一句,我刚刚检查了数据 - 似乎有重复的行,因为对于给定的 user_id、地址、类型、国家 的相同组合存在 多个不同的计数。这怎么发生的?我应该在最后的df_pyspark
上做一个groupBy("user_id", "address", "type", "country").sum()
吗?
是的,因为distinct()
仅删除每行的重复项(对于所有列),因此您可以将count
类似地合并到窗口函数中,例如address
所以:first("count").over(w).alias("top_count")
,所以让它和窗口一样。或者你可以做 groupBy
agg
并采取例如最大限度。来自count
根据:spark.apache.org/docs/2.1.0/api/python/…以上是关于PySpark 汇总数据框的主要内容,如果未能解决你的问题,请参考以下文章
为啥 PySpark 中的 agg() 一次只能汇总一列? [复制]
pyspark 从 pyspark sql 数据框创建字典数据