PySpark:根据另一列的顺序收集数据框列上的集合
Posted
技术标签:
【中文标题】PySpark:根据另一列的顺序收集数据框列上的集合【英文标题】:PySpark: collect_set on dataframe colum based on order of another column 【发布时间】:2019-10-21 16:47:54 【问题描述】:我有一个看起来有点像这样的 Spark 数据框:
id country date action
1 A 2019-01-01 suppress
1 A 2019-01-02 suppress
2 A 2019-01-03 bid-up
2 A 2019-01-04 bid-down
3 C 2019-01-01 no-action
3 C 2019-01-02 bid-up
4 D 2019-01-01 suppress
我想通过按 id、国家分组并将“action”列的唯一值收集到一个数组中来减少这个数据框,但是这个数组应该按日期列排序。
例如
id country action_arr
1 A [suppress]
2 A [bid-up, bid-down]
3 C [no-action, bid-up]
4 D [suppress]
为了更简洁地解释这一点,我有一些 SQL (presto) 代码可以完全满足我的需求......我只是在 PySpark 或 SparkSQL 中努力做到这一点:
SELECT id, country, array_distinct(array_agg(action ORDER BY date ASC)) AS actions
FROM table
GROUP BY id, country
现在这是我在 PySpark 中的尝试:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('action').orderBy('date')
sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))
然后我想按组找出每组动作的出现次数:
df = sorted_list_df.select('country', 'sorted_list').groupBy('coutry', 'sorted_list').agg(F.count('sorted_list'))
代码运行但在输出中他的 sorted_list 列与没有任何数组聚合的操作基本相同..有人可以帮忙吗?
编辑:我几乎得到了我想要的东西。但结果与 presto 结果不完全匹配。谁能解释为什么?解决方法如下:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('action').orderBy('date')
df_2 = df.withColumn("sorted_list", F.collect_set("action").over(Window.partitionBy("id").orderBy("date")))
test = df_2.select('id', 'country', 'sorted_list')\
.dropDuplicates()\
.select('country', 'sorted_list')\
.groupBy('site_name', 'sorted_list')\
.agg(F.count('sorted_list'))
【问题讨论】:
为什么返回错误的结果,你认为什么样的结果是正确的? @GrzegorzSkibinski - 编辑描述以澄清 【参考方案1】:IMO,您的窗口定义错误。您应该按要分组的列进行分区,然后为每个组收集一组唯一值。
IIUC,你只需要这样做:
w = Window.partitionBy(['id', 'country']).orderBy('date')
sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))
df_new = sorted_list_df.select('id', 'country', 'sorted_list').withColumn("count_of_elems", F.size("sorted_list"))
缺点:
如果您使用窗口,您将为每一行设置一个新集,并且您的行数将与旧的 df 相同。本身不会有聚合,因为我认为这也不是你想要的。
下一行将每个组的值聚合为一组。我希望它能得到你想要的:
df_new = sorted_list_df.groupby('id', 'country').agg(F.max('sorted_list').alias('sorted_list')).withColumn("count_of_elems", F.size("sorted_list"))
【讨论】:
以上是关于PySpark:根据另一列的顺序收集数据框列上的集合的主要内容,如果未能解决你的问题,请参考以下文章