Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序

Posted

技术标签:

【中文标题】Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序【英文标题】:Groupby and collect_list maintaining order based on another column in PySpark 【发布时间】:2020-01-17 07:04:15 【问题描述】:

我有一个像这样的 PySpark 数据框,

+----------+------------+------------+------------+
|   Name   |  dateCol1  |  dateCol2  |  dateCol3  |
+----------+------------+------------+------------+
| user1    | 2018-01-01 | 2018-01-10 | 2018-01-01 |
| user1    | 2018-01-11 | 2018-01-20 | 2018-01-01 |
| user2    | 2018-01-11 | 2018-01-20 | 2018-01-11 |
| user1    | 2019-01-21 | 2018-01-30 | 2018-01-01 |
+----------+------------+------------+------------+

我想根据键、dateCol1 和 dateCol2 以及列名称上的 collect_list 对这个数据集进行分组。为此,我正在使用代码,

spark_df.groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))

在收集要列出的列时,我还想根据列 dateCol3 维护值的顺序。

例如,我想确保对于dateCol1 == '2018-01-11'dateCol2 == '2018-01-20',收集到列表我将始终得到[user1, user2](基于dateCol3 的顺序)。

数据帧所需的输出是,

+------------+------------+----------------+
|  dateCol1  |  dateCol2  |    List        |
+------------+------------+----------------+
| 2018-01-01 | 2018-01-10 | [user1]        |
| 2018-01-11 | 2018-01-20 | [user1, user2] |
| 2019-01-21 | 2018-01-30 | [user1]        |
+------------+------------+----------------+

collect_list 默认情况下不会保持顺序,如何确保收集的列表是基于数据框中的另一个外部列排序的?

【问题讨论】:

@YOLO,我想在collect_list基于dateCol3创建列表时保持顺序。根据 dateCol3 然后 collect_list 对数据框进行排序不一致。 【参考方案1】:

你可以试试:

spark_df.orderBy('dateCol3', ascending=True).groupBy('dateCol1', 'dateCol2').agg(F.collect_list('Name'))

或者,虽然有点矫枉过正,但您可以使用窗口化:

from pyspark.sql import Window as w

spark_df.select('dateCol1', 'dateCol2', F.collect_list('Name').over(w.partitionBy(['dateCol1','dateCol2']).orderBy(F.col('dateCol3'))).alias('Name')).distinct()

【讨论】:

我正在寻找基于窗口的方法。我猜基于 dateCol3 的分区和排序将确保按顺序收集列表。不是吗? 是的,正是这个想法。 解决方案几乎是正确的,但获得了额外的行。 distinct 不会删除某些行。我想知道我是否可以在 groupBy 中对 collect_list 进行分区和排序。否则,我必须再次使用窗口并 dropDuplicates 保持顺序的键 是的,你可以把它放在.groupBy(...) 之后.select(...) - 这也可以。但是我认为 distinct() 应该为您做,除非您有更多的列要添加到那里和/或更多的转换

以上是关于Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark groupby 然后在组内排序

Hive---collect_list和collect_set

在 Spark SQL 中使用 collect_list 和 collect_set

如何使用collect_list?

PySpark 使用 collect_list 收集不同长度的数组

如何在 MySQL 中对 concat_ws(',',collect_list(a)) 进行排序?