PySpark groupby 和最大值选择
Posted
技术标签:
【中文标题】PySpark groupby 和最大值选择【英文标题】:PySpark groupby and max value selection 【发布时间】:2016-11-30 13:23:39 【问题描述】:我有一个类似的 PySpark 数据框
name city date
satya Mumbai 13/10/2016
satya Pune 02/11/2016
satya Mumbai 22/11/2016
satya Pune 29/11/2016
satya Delhi 30/11/2016
panda Delhi 29/11/2016
brata BBSR 28/11/2016
brata Goa 30/10/2016
brata Goa 30/10/2016
我需要为每个名称找出最喜欢的城市,逻辑是“如果城市在聚合 'name'+'city' 对中具有最大城市出现次数,则将城市作为 fav_city”。如果发现多个相同的事件,则考虑具有最新日期的城市。将解释:
d = df.groupby('name','city').count()
#name city count
brata Goa 2 #clear favourite
brata BBSR 1
panda Delhi 1 #as single so clear favourite
satya Pune 2 ##Confusion
satya Mumbai 2 ##confusion
satya Delhi 1 ##shd be discard as other cities having higher count than this city
#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name count city
brata 2 Goa #fav found
panda 1 Delhi #fav found
satya 2 Mumbai #can't say
satya 2 Pune #can't say
如果是用户“satya”,我需要返回 trx_history 并获取具有 equal_max 计数的城市的最新日期:e 来自最后交易的“孟买”或“浦那”(最大日期),将该城市视为最喜欢的城市。在这种情况下,“Pune”作为“29/11/2016”是最新/最大日期。
但我无法进一步说明如何完成。
请在逻辑上帮助我,或者如果有更好的解决方案(更快/紧凑的方式),请提出建议。谢谢。
【问题讨论】:
【参考方案1】:首先将日期转换为DateType
:
import pyspark.sql.functions as F
df_with_date = df.withColumn(
"date",
F.to_date("date", "dd/MM/yyyy")
# For Spark < 2.2
# F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)
下一个groupBy
用户和城市,但像这样扩展聚合:
df_agg = (df_with_date
.groupBy("name", "city")
.agg(F.count("city").alias("count"), F.max("date").alias("max_date")))
定义一个窗口:
from pyspark.sql.window import Window
w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))
添加排名:
df_with_rank = (df_agg
.withColumn("rank", F.dense_rank().over(w)))
并过滤:
result = df_with_rank.where(F.col("rank") == 1)
您可以使用如下代码检测剩余的重复项:
import sys
final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
【讨论】:
【参考方案2】:d = df.groupby('name','city').count()
#name city count
brata Goa 2 #clear favourite
brata BBSR 1
panda Delhi 1 #as single so clear favourite
satya Pune 2 ##Confusion
satya Mumbai 2 ##confusion
satya Delhi 1 ##shd be discard as other cities having higher count than this city
#So get cities having max count
dd = d.groupby('name').count().sort(F.col('count').desc())
display(dd.take(1))
【讨论】:
请为您的回答提供一些背景信息。它将帮助其他人轻松理解您的方法。以上是关于PySpark groupby 和最大值选择的主要内容,如果未能解决你的问题,请参考以下文章
Postgres 在 groupby 和 max 之后选择 *