PySpark 汇总数据框

Posted

技术标签:

【中文标题】PySpark 汇总数据框【英文标题】:PySpark rolling up a dataframe 【发布时间】:2019-09-19 21:21:19 【问题描述】:

我有以下数据框-

>>> my_df.show(3)
+------------+---------+-------+--------------+
|     user_id|  address|   type|count| country|
+------------+---------+-------+-----+--------+
|      ABC123|  yyy,USA| animal|    2|     USA|
|      ABC123|  xxx,USA| animal|    3|     USA|
|      qwerty|  55A,AUS|  human|    3|     AUS|
|      ABC123|  zzz,RSA| animal|    4|     RSA|
+------------+---------+-------+--------------+

如何汇总此数据框以获得以下结果-

>>> new_df.show(3)
+------------+---------+-------+--------------+
|     user_id|  address|   type|count| country|
+------------+---------+-------+-----+--------+
|      qwerty|  55A,AUS|  human|    3|     AUS|
|      ABC123|  xxx,USA| animal|    5|     USA|
+------------+---------+-------+--------------+ 

对于给定的user_id

    获取计数总和最高的country 对于第 1 步得到的country,获取计数最高的address

我猜我必须将 my_df 拆分为 2 个不同的数据帧,并分别获取 countryaddress。但我并不完全知道它的语法。感谢您的帮助。谢谢。

【问题讨论】:

我会尝试窗口函数 - .select 中的 first,然后是 .groupbyagg,按 user_id 和您想从 first 获取的 2 列。这可能会有所帮助:***.com/questions/35142216/… 我不太明白那个答案.. 请看下面我的回答。 【参考方案1】:

我的意思是这样的:

>>> import pandas as pd
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.window import *
>>> from pyspark.sql import SparkSession

>>> spark = SparkSession.builder.appName('abc').getOrCreate()

>>> data = "user_id": ["ABC123", "ABC123", "qwerty", "ABC123"], "address": ["yyy,USA", "xxx,USA", "55A,AUS", "zzz,RSA"], "type": ["animal", "animal", "human", "animal"], "count": [2,3,3,4], "country": ["USA", "USA", "AUS", "RSA"]

>>> df = pd.DataFrame(data=data)

>>> df_pyspark = spark.createDataFrame(df)

>>> w = Window().partitionBy("user_id", "country").orderBy((col("count").desc()))

>>> w2 = Window().partitionBy("user_id").orderBy(col("sum_country").desc())

>>> df_pyspark.select("user_id", "address", "type", "count", "country", sum("count").over(w).alias("sum_country")).select("user_id", first("country").over(w2).alias("top_country"), first("address").over(w).alias("top_address"), "country").where(col("top_country")==col("country")).distinct().show()
+-------+-----------+-----------+-------+
|user_id|top_country|top_address|country|
+-------+-----------+-----------+-------+
| qwerty|        AUS|    55A,AUS|    AUS|
| ABC123|        USA|    xxx,USA|    USA|
+-------+-----------+-----------+-------+

您可以添加类型、计数等,具体取决于您希望使用的逻辑 - 您可以执行与 top_address 相同的操作(即 first 函数),也可以 groupByagg

【讨论】:

这很好用。虽然,当我添加 typecount 列时,df_pyspark.count() 与我没有这些列时不同。我做了df_pyspark.select("user_id", "address", "type", "count", "country", sum("count").over(w).alias("sum_country")).select("user_id", first("country").over(w2).alias("top_country"), first("address").over(w).alias("top_address"), "country", "type", col("sum_country").alias("count")).where(col("top_country")==col("country")).distinct().show()。我该怎么办? 另外,type 对于给定的user_id 将始终保持不变。 嗯,这应该可以正常工作。所以上面,如果你用.count() 替换.show() 应该返回2,它对你显示不同吗? 我没听懂你。顺便说一句,我刚刚检查了数据 - 似乎有重复的行,因为对于给定的 user_id、地址、类型、国家 的相同组合存在 多个不同的计数。这怎么发生的?我应该在最后的df_pyspark 上做一个groupBy("user_id", "address", "type", "country").sum() 吗? 是的,因为distinct() 仅删除每行的重复项(对于所有列),因此您可以将count 类似地合并到窗口函数中,例如address 所以:first("count").over(w).alias("top_count"),所以让它和窗口一样。或者你可以做 groupBy agg 并采取例如最大限度。来自count 根据:spark.apache.org/docs/2.1.0/api/python/…

以上是关于PySpark 汇总数据框的主要内容,如果未能解决你的问题,请参考以下文章

为啥 PySpark 中的 agg() 一次只能汇总一列? [复制]

Pyspark - 获取具有条件的列的累积总和

满足条件时计算的pyspark udf

pyspark 从 pyspark sql 数据框创建字典数据

python, pyspark : 获取 pyspark 数据框列值的总和

如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较