PySpark:根据另一列的顺序收集数据框列上的集合

Posted

技术标签:

【中文标题】PySpark:根据另一列的顺序收集数据框列上的集合【英文标题】:PySpark: collect_set on dataframe colum based on order of another column 【发布时间】:2019-10-21 16:47:54 【问题描述】:

我有一个看起来有点像这样的 Spark 数据框:

id  country  date        action
 1    A   2019-01-01   suppress
 1    A   2019-01-02   suppress
 2    A   2019-01-03   bid-up
 2    A   2019-01-04   bid-down
 3    C   2019-01-01   no-action
 3    C   2019-01-02   bid-up
 4    D   2019-01-01   suppress

我想通过按 id、国家分组并将“action”列的唯一值收集到一个数组中来减少这个数据框,但是这个数组应该按日期列排序。

例如

id  country action_arr
 1    A      [suppress]
 2    A      [bid-up, bid-down]
 3    C      [no-action, bid-up]
 4    D      [suppress]

为了更简洁地解释这一点,我有一些 SQL (presto) 代码可以完全满足我的需求......我只是在 PySpark 或 SparkSQL 中努力做到这一点:

SELECT id, country, array_distinct(array_agg(action ORDER BY date ASC)) AS actions
FROM table
GROUP BY id, country

现在这是我在 PySpark 中的尝试:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('action').orderBy('date')

sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))

然后我想按组找出每组动作的出现次数:

df = sorted_list_df.select('country', 'sorted_list').groupBy('coutry', 'sorted_list').agg(F.count('sorted_list'))

代码运行但在输出中他的 sorted_list 列与没有任何数组聚合的操作基本相同..有人可以帮忙吗?

编辑:我几乎得到了我想要的东西。但结果与 presto 结果不完全匹配。谁能解释为什么?解决方法如下:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('action').orderBy('date')

df_2 = df.withColumn("sorted_list", F.collect_set("action").over(Window.partitionBy("id").orderBy("date")))

test = df_2.select('id', 'country', 'sorted_list')\
           .dropDuplicates()\
           .select('country', 'sorted_list')\
           .groupBy('site_name', 'sorted_list')\
           .agg(F.count('sorted_list'))

【问题讨论】:

为什么返回错误的结果,你认为什么样的结果是正确的? @GrzegorzSkibinski - 编辑描述以澄清 【参考方案1】:

IMO,您的窗口定义错误。您应该按要分组的列进行分区,然后为每个组收集一组唯一值。

IIUC,你只需要这样做:

w = Window.partitionBy(['id', 'country']).orderBy('date')

sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))

df_new = sorted_list_df.select('id', 'country', 'sorted_list').withColumn("count_of_elems", F.size("sorted_list"))

缺点

如果您使用窗口,您将为每一行设置一个新集,并且您的行数将与旧的 df 相同。本身不会有聚合,因为我认为这也不是你想要的。

下一行将每个组的值聚合为一组。我希望它能得到你想要的:

df_new = sorted_list_df.groupby('id', 'country').agg(F.max('sorted_list').alias('sorted_list')).withColumn("count_of_elems", F.size("sorted_list"))

【讨论】:

以上是关于PySpark:根据另一列的顺序收集数据框列上的集合的主要内容,如果未能解决你的问题,请参考以下文章

根据另一列的值过滤数据框列[重复]

基于针对另一列的参考表更新 Pandas 数据框列的问题

根据另一列的元素从 pyspark 数组中删除元素

在另一列上查找最近的时间戳并在新列中添加值 PySpark

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串

PySpark - 根据另一列值的降序添加递增的整数排名值